This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d3ab43  Add hostname-verification at client tls connection (#1208)
8d3ab43 is described below

commit 8d3ab43cee86c9e49a54db13929a4ecb09e8152f
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Feb 9 17:43:23 2018 -0800

    Add hostname-verification at client tls connection (#1208)
    
    * Add hostname-verification at client tls connection
    
    * add httpclient dep with exclude all + add pem in apache-rat
    
    * add httpclient+commons-logging dep in client-shading and LICENSE
    
    * shade artifacts
    
    * fix: proxy send certs to client for host verification
---
 all/src/assemble/LICENSE.bin.txt                   |   2 +
 pom.xml                                            |  13 ++
 pulsar-broker-shaded/pom.xml                       |   6 +
 .../broker/service/PulsarChannelInitializer.java   |  14 ++
 .../AuthenticationTlsHostnameVerificationTest.java | 255 +++++++++++++++++++++
 .../tls/hn-verification/broker-cert.pem            |  82 +++++++
 .../tls/hn-verification/broker-key.pem             |  28 +++
 .../authentication/tls/hn-verification/cacert.pem  |  79 +++++++
 .../pulsar-client-kafka/pom.xml                    |   6 +
 pulsar-client-shaded/pom.xml                       |   6 +
 pulsar-client/pom.xml                              |  18 ++
 .../pulsar/client/api/ClientConfiguration.java     |  18 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  54 ++++-
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +
 .../proxy/server/ServiceChannelInitializer.java    |  14 ++
 .../server/ProxyWithProxyAuthorizationTest.java    |  51 ++++-
 16 files changed, 642 insertions(+), 6 deletions(-)

diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt
index 41f9000..a7e70de 100644
--- a/all/src/assemble/LICENSE.bin.txt
+++ b/all/src/assemble/LICENSE.bin.txt
@@ -332,6 +332,8 @@ The Apache Software License, Version 2.0
  * Jetty - org.eclipse.jetty-*.jar
  * SnakeYaml -- org.yaml-snakeyaml-*.jar
  * RocksDB - org.rocksdb.*.jar
+ * HttpClient - org.apache.httpcomponents.httpclient.jar
+ * CommonsLogging - commons-logging-*.jar
 
 BSD 3-clause "New" or "Revised" License
  * EA Agent Loader -- com.ea.agentloader-*.jar -- 
licenses/LICENSE-EA-Agent-Loader.txt
diff --git a/pom.xml b/pom.xml
index 27ff691..7320661 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,18 @@ flexible messaging model and an intuitive client 
API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>4.5.5</version>
+        <exclusions>
+          <exclusion>
+            <groupId>*</groupId>
+            <artifactId>*</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      
+      <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
         <version>6.13.1</version>
@@ -760,6 +772,7 @@ flexible messaging model and an intuitive client 
API.</description>
             <exclude>**/*.crt</exclude>
             <exclude>**/*.key</exclude>
             <exclude>**/*.csr</exclude>
+            <exclude>**/*.pem</exclude>
             <exclude>**/*.json</exclude>
             <exclude>**/*.htpasswd</exclude>
             <exclude>src/test/resources/athenz.conf.test</exclude>
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index fd3ff68..bda3037 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -104,6 +104,8 @@
                   <include>org.aspectj:*</include>
                   <include>com.ea.agentloader:*</include>
                   <include>com.wordnik:swagger-annotations</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -298,6 +300,10 @@
                   <pattern>com.wordnik</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.worknik</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index cd0415a..3138769 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker.service;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
@@ -68,6 +71,17 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
                     builder.trustManager(trustCertCollection);
                 }
             }
+            
+            ServiceConfiguration config = 
brokerService.pulsar().getConfiguration();
+            String certFilePath = config.getTlsCertificateFilePath();
+            String keyFilePath = config.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && 
StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new 
AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
+            
             SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
new file mode 100644
index 0000000..5ccfc14
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.util.PublicSuffixMatcher;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class AuthenticationTlsHostnameVerificationTest extends 
ProducerConsumerBase {
+    private static final Logger log = 
LoggerFactory.getLogger(AuthenticationTlsHostnameVerificationTest.class);
+
+    // Man in middle certificate which tries to act as a broker by sending its 
own valid certificate
+    private final String TLS_MIM_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/cacert.pem";
+    private final String TLS_MIM_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/broker-cert.pem";
+    private final String TLS_MIM_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/hn-verification/broker-key.pem";
+
+    private final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
+
+    private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+
+    private final String BASIC_CONF_FILE_PATH = 
"./src/test/resources/authentication/basic/.htpasswd";
+
+    private final static String brokerHostName = "localhost";
+    private boolean hostnameVerificationEnabled = true;
+
+    protected void setup() throws Exception {
+        if (methodName.equals("testAnonymousSyncProducerAndConsumer")) {
+            conf.setAnonymousUserRole("anonymousUser");
+        }
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        conf.setTlsEnabled(true);
+        conf.setTlsAllowInsecureConnection(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("localhost");
+        superUserRoles.add("superUser");
+        superUserRoles.add("superUser2");
+        conf.setSuperUserRoles(superUserRoles);
+
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_SERVER_KEY_FILE_PATH);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderBasic.class.getName());
+        System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("use");
+
+        super.init();
+
+        setupClient();
+    }
+
+    protected void setupClient() throws Exception {
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        org.apache.pulsar.client.api.ClientConfiguration clientConf = new 
org.apache.pulsar.client.api.ClientConfiguration();
+        clientConf.setStatsInterval(0, TimeUnit.SECONDS);
+        clientConf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        clientConf.setTlsAllowInsecureConnection(true);
+        clientConf.setAuthentication(authTls);
+        clientConf.setUseTls(true);
+        
clientConf.setTlsHostnameVerificationEnable(hostnameVerificationEnabled);
+
+        admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
+        String lookupUrl;
+        lookupUrl = new URI("pulsar+ssl://" + brokerHostName + ":" + 
BROKER_PORT_TLS).toString();
+        pulsarClient = PulsarClient.create(lookupUrl, clientConf);
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (!methodName.equals("testDefaultHostVerifier")) {
+            super.internalCleanup();
+        }
+    }
+
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create 
producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "pulsar"
+     * 3. Client verifies the host-name and closes the connection and fails 
consumer creation
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "hostnameVerification")
+    public void testTlsSyncProducerAndConsumerWithInvalidBrokerHost(boolean 
hostnameVerificationEnabled)
+            throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        this.hostnameVerificationEnabled = hostnameVerificationEnabled;
+        // setup broker cert which has CN = "pulsar" different than broker's 
hostname="localhost"
+        conf.setTlsTrustCertsFilePath(TLS_MIM_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_MIM_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_MIM_SERVER_KEY_FILE_PATH);
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_MIM_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to 
hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because 
hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * It verifies that client performs host-verification in order to create 
producer/consumer.
+     * 
+     * <pre>
+     * 1. Client tries to connect to broker with hostname="localhost"
+     * 2. Broker sends x509 certificates with CN = "localhost"
+     * 3. Client verifies the host-name and continues
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testTlsSyncProducerAndConsumerCorrectBrokerHost() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // setup broker cert which has CN = "localhost"
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+
+        setup();
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        Consumer consumer = 
pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", 
"my-subscriber-name",
+                conf);
+
+        ProducerConfiguration producerConf = new ProducerConfiguration();
+
+        Producer producer = 
pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic", 
producerConf);
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * This test verifies {@link DefaultHostnameVerifier} behavior and gives 
fair idea about host matching result
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testDefaultHostVerifier() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        Method matchIdentityStrict = 
DefaultHostnameVerifier.class.getDeclaredMethod("matchIdentityStrict",
+                String.class, String.class, PublicSuffixMatcher.class);
+        matchIdentityStrict.setAccessible(true);
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, "pulsar", 
"pulsar", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar.com", "pulsar", null));
+        Assert.assertTrue((boolean) matchIdentityStrict.invoke(null, 
"pulsar-broker1.com", "pulsar*.com", null));
+        // unmatched remainder: "1-broker." should not contain "."
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar-broker1.com", "pulsar*com", null));
+        Assert.assertFalse((boolean) matchIdentityStrict.invoke(null, 
"pulsar.com", "*", null));
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
new file mode 100644
index 0000000..ac9d51b
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem
@@ -0,0 +1,82 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:50
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:41 2018 GMT
+            Not After : Feb  9 01:11:41 2019 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, 
CN=pulsar*.apache.com
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:e8:bb:b6:87:37:6b:68:44:c9:d6:01:ba:a5:93:
+                    e4:5f:b1:0e:64:23:a9:7b:bd:c1:a6:a8:b8:b9:2c:
+                    c9:73:57:5a:41:89:db:01:64:30:06:dc:5b:4e:01:
+                    d3:02:73:86:d1:f9:c2:a2:5f:8c:c1:4c:00:bc:b1:
+                    bd:67:18:f6:88:ee:b6:72:be:37:18:2f:5d:c2:a1:
+                    30:20:02:38:2b:5e:a9:50:f2:c4:f7:23:74:ef:ad:
+                    4e:b1:25:f7:49:5e:8d:98:cd:2d:71:88:2c:73:df:
+                    eb:5c:2e:f0:5e:e6:15:1e:82:1e:94:33:15:f5:7b:
+                    65:9e:b2:78:89:7a:7f:b7:c1:6a:a3:a9:34:3c:96:
+                    32:2a:26:1d:67:d1:0a:80:1f:7c:95:34:c6:fb:ea:
+                    11:1c:53:86:81:04:bb:90:45:2b:4f:99:9c:72:f5:
+                    ec:86:4b:2f:7e:c3:65:6c:ac:e0:74:5f:35:4e:ee:
+                    3f:d0:82:2b:20:bb:80:65:3f:fe:78:96:42:19:35:
+                    e1:46:bd:d9:4e:b7:b8:95:5f:25:6b:a6:f2:e3:87:
+                    13:d3:29:11:c5:a2:84:bb:12:81:ea:15:60:2f:16:
+                    7e:f9:86:bc:e3:93:ed:d7:ec:5a:34:ae:4c:cd:00:
+                    40:dc:c6:e7:f6:19:ed:63:7f:8f:d0:dd:c5:11:9d:
+                    95:2d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Basic Constraints: 
+                CA:FALSE
+            Netscape Comment: 
+                OpenSSL Generated Certificate
+            X509v3 Subject Key Identifier: 
+                06:DC:92:77:64:D3:21:AB:08:F6:E4:0C:9A:47:3F:3A:8B:CB:E8:D8
+            X509v3 Authority Key Identifier: 
+                
keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+    Signature Algorithm: sha256WithRSAEncryption
+         70:0b:e4:07:45:98:d3:17:02:2f:44:ec:aa:41:2e:39:57:5e:
+         8a:e0:21:77:59:39:1d:66:c2:10:ea:ae:73:8a:50:94:5e:ad:
+         05:56:aa:8a:2f:87:44:09:cb:50:2c:5a:44:d1:99:fe:ee:5c:
+         82:fb:db:d4:5c:bd:56:dd:e6:37:87:0a:64:2c:85:19:dc:2d:
+         d1:22:00:91:53:5d:4c:f2:1c:4f:61:84:8e:77:e1:cc:9e:f8:
+         16:bb:15:b0:5a:f4:12:c7:b6:3b:28:cf:e3:95:9a:a8:68:ad:
+         02:7e:88:34:88:cd:31:d9:cd:17:8a:ef:5d:d5:40:c7:37:ca:
+         d0:38:35:46:d0:7d:f9:b6:85:f5:ef:9d:f3:05:9c:38:3f:67:
+         df:97:94:a8:81:5d:e3:70:ff:96:28:58:13:37:8a:3f:2a:b9:
+         6a:2a:c6:aa:89:16:91:9a:e7:9c:f3:72:36:74:de:46:7f:4f:
+         26:56:6e:05:47:99:ee:38:26:13:77:16:f5:07:cd:f1:69:6e:
+         08:c8:3b:ef:35:96:b3:b1:8e:87:eb:bd:da:02:b8:40:aa:e8:
+         16:11:80:98:81:77:5a:97:41:58:bd:01:50:4c:6c:c4:14:43:
+         d4:ac:c7:25:8b:df:a4:94:f5:29:12:72:56:8c:25:94:d8:8f:
+         c1:fa:4b:59
+-----BEGIN CERTIFICATE-----
+MIIDtjCCAp6gAwIBAgIJANiZ1c4n9b5QMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExNDFa
+Fw0xOTAyMDkwMTExNDFaMGIxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxGzAZBgNVBAMM
+EnB1bHNhciouYXBhY2hlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
+ggEBAOi7toc3a2hEydYBuqWT5F+xDmQjqXu9waaouLksyXNXWkGJ2wFkMAbcW04B
+0wJzhtH5wqJfjMFMALyxvWcY9ojutnK+NxgvXcKhMCACOCteqVDyxPcjdO+tTrEl
+90lejZjNLXGILHPf61wu8F7mFR6CHpQzFfV7ZZ6yeIl6f7fBaqOpNDyWMiomHWfR
+CoAffJU0xvvqERxThoEEu5BFK0+ZnHL17IZLL37DZWys4HRfNU7uP9CCKyC7gGU/
+/niWQhk14Ua92U63uJVfJWum8uOHE9MpEcWihLsSgeoVYC8WfvmGvOOT7dfsWjSu
+TM0AQNzG5/YZ7WN/j9DdxRGdlS0CAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgB
+hvhCAQ0EHxYdT3BlblNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYE
+FAbckndk0yGrCPbkDJpHPzqLy+jYMB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idas
+T2XyLgI5MA0GCSqGSIb3DQEBCwUAA4IBAQBwC+QHRZjTFwIvROyqQS45V16K4CF3
+WTkdZsIQ6q5zilCUXq0FVqqKL4dECctQLFpE0Zn+7lyC+9vUXL1W3eY3hwpkLIUZ
+3C3RIgCRU11M8hxPYYSOd+HMnvgWuxWwWvQSx7Y7KM/jlZqoaK0Cfog0iM0x2c0X
+iu9d1UDHN8rQODVG0H35toX1753zBZw4P2ffl5SogV3jcP+WKFgTN4o/KrlqKsaq
+iRaRmuec83I2dN5Gf08mVm4FR5nuOCYTdxb1B83xaW4IyDvvNZazsY6H673aArhA
+qugWEYCYgXdal0FYvQFQTGzEFEPUrMcli9+klPUpEnJWjCWU2I/B+ktZ
+-----END CERTIFICATE-----
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
new file mode 100644
index 0000000..b6bde08
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDou7aHN2toRMnW
+Abqlk+RfsQ5kI6l7vcGmqLi5LMlzV1pBidsBZDAG3FtOAdMCc4bR+cKiX4zBTAC8
+sb1nGPaI7rZyvjcYL13CoTAgAjgrXqlQ8sT3I3TvrU6xJfdJXo2YzS1xiCxz3+tc
+LvBe5hUegh6UMxX1e2WesniJen+3wWqjqTQ8ljIqJh1n0QqAH3yVNMb76hEcU4aB
+BLuQRStPmZxy9eyGSy9+w2VsrOB0XzVO7j/Qgisgu4BlP/54lkIZNeFGvdlOt7iV
+XyVrpvLjhxPTKRHFooS7EoHqFWAvFn75hrzjk+3X7Fo0rkzNAEDcxuf2Ge1jf4/Q
+3cURnZUtAgMBAAECggEBAKUj5V3HBlDDVtCjA3TQHyGDeim2YGGsgQen+wNyczOD
+zUhp8FvpYmbL34HXq4m2vfiql+AtmqviKTe7iyDnxq/datq6fE+N9KLRS1u7F242
+yj/lM7wFjckwGYF75h9Kl4DQPimsLZa/Ubtkly1PZ7bxL4+LPE6nE7FrBDrREGUq
+39bUGmMPXzLRxVSUdmLQIUsgLtuAOVfQB5qZ75zIUMmBhPhNhDgUv35cLxmgj5J8
+GPJxG21BBm88UYA+dhPLTAk+k3rLVKeZfXV75U0Zt04JHthhnFZ+/mJk8AD6c+jZ
+d2M1TdRSMkyTgd0DpN/bQiBvs+MK6dSkDJvYQOVGfQUCgYEA+7C1fNRQgeyJh5HJ
+waRr+9oKBLk1bTq5KaiMFF0SQo0rp5AShjG3ucTiKOBleUkiig/CpLH3CvToapq6
+uh8xLZm8Fz1AIwQ/qjRlVeNzNPCrstRk/BYgmQREr7kDH7RzvynJZYKdwpaJA3+4
+ICK/ES2FGcgNZahnm5brrCc/gxMCgYEA7LfnzWj0x5vCOlGSwo/LjFb9UgreJLQ9
+U1W/ACg9H5cp81AVTMRr9UsZOyaWJrdCTyfiQJOEZQ3YdwjBSr6f5vOxwqF68Mmi
+WG1PhP/kZsGI/cwlEA2odkoy/BGfxSMrfiCaxQNovG35agbRiJ5Awci2lOViPnvF
+HPKUULHpTr8CgYBbykVWAiReTcKWc5/OBEXxcsJmmJkYfesbe0GjB5JqPQvnr05i
+LG2hzWDhoXzAb+Ct0zOcVt8O2uSMRGPHDysjQ0bqfscOPjVtwHAYk7vnWcJ0lKtD
+mFpJE9ps759pB6mS1Q2C/NDGL5pGcWTYK3PdMumwzlm8cl9eyfqnLSUniwKBgQCO
+drfpJat7nkAsfP+IXKYyFgBrKeM7z8XAq7BB1fXDV2SF7MKE6wnWHJZYxQZE0rHz
+lZtTJfTeJJEMQpah90ug4TUwX6Lv20n7Uf4zmxXIyd06cWw01yN13X4Fuk2fhWUd
+iV3cCAs2rDEZIHVmdWefuL45qjuQQ0kD/PJKBmjVXQKBgQC2kaXVskAqZJwyfn5r
+g2hoRxjgv58UGyTsVwiwkSfoYQGdw1otO2zuyYbZZxGttMo1HkKTXFUNDELpiFXb
+5GcfT6xxssEH8zvh30M8rS0nF0AkMGZhxJxPdBnh5enwNg6glStcMY6ZaEDVz34k
+MAr7/FNPcrJt/EgvQ7PYj0/HVg==
+-----END PRIVATE KEY-----
diff --git 
a/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
new file mode 100644
index 0000000..4c98286
--- /dev/null
+++ 
b/pulsar-broker/src/test/resources/authentication/tls/hn-verification/cacert.pem
@@ -0,0 +1,79 @@
+Certificate:
+    Data:
+        Version: 3 (0x2)
+        Serial Number:
+            d8:99:d5:ce:27:f5:be:4f
+    Signature Algorithm: sha256WithRSAEncryption
+        Issuer: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Validity
+            Not Before: Feb  9 01:11:04 2018 GMT
+            Not After : Feb  8 01:11:04 2021 GMT
+        Subject: C=AU, ST=Some-State, O=Internet Widgits Pty Ltd, CN=testCA
+        Subject Public Key Info:
+            Public Key Algorithm: rsaEncryption
+                Public-Key: (2048 bit)
+                Modulus:
+                    00:cc:50:cd:b6:68:b2:e0:5f:bd:a5:4a:5c:17:bc:
+                    d8:b9:43:e6:22:9a:8a:2e:1b:87:13:b6:ca:59:7e:
+                    d7:ee:50:fe:ef:bf:ae:4d:cc:26:70:b4:27:03:64:
+                    36:73:d5:fd:2e:08:37:b2:2d:36:26:c8:e3:d3:9e:
+                    d3:37:0d:56:fa:a9:78:55:db:09:b3:21:b7:ac:c8:
+                    12:35:16:21:ed:a8:5e:4a:a4:e3:11:a0:67:ae:4c:
+                    5b:a7:15:ff:72:b1:7a:77:2b:ea:bd:3c:89:5c:40:
+                    ae:58:4d:69:56:d6:d9:50:42:e7:d7:b1:58:cc:c8:
+                    2a:84:b0:16:7c:3a:82:38:46:78:cc:4b:8a:db:ac:
+                    cc:4c:e1:a8:c2:d4:8f:b0:d9:dc:79:f8:70:28:8a:
+                    76:4f:dc:b1:09:a2:15:65:33:de:2a:2f:8e:27:7a:
+                    0b:93:6b:66:4b:e2:53:33:97:a2:26:bf:f3:b2:8a:
+                    f2:6c:5c:41:5b:1a:bb:12:6c:2f:f3:14:35:c4:40:
+                    4b:16:65:64:72:16:bf:a3:d6:1b:4d:9b:e6:12:cb:
+                    0a:c7:a9:01:f8:63:2b:b7:22:7a:fd:ef:6b:03:9e:
+                    e5:06:87:1d:a5:d5:11:4c:11:ae:55:62:11:f5:57:
+                    7b:21:51:77:8e:b8:cf:2f:7d:86:d6:38:d3:af:28:
+                    bc:8d
+                Exponent: 65537 (0x10001)
+        X509v3 extensions:
+            X509v3 Subject Key Identifier: 
+                62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+            X509v3 Authority Key Identifier: 
+                
keyid:62:6F:F8:A2:85:3C:5C:7E:94:CC:3E:89:D6:AC:4F:65:F2:2E:02:39
+
+            X509v3 Basic Constraints: 
+                CA:TRUE
+    Signature Algorithm: sha256WithRSAEncryption
+         01:5a:ff:b8:36:ff:0c:9c:12:cc:ad:b2:60:ac:3c:91:c1:04:
+         c0:6b:10:f6:e0:0b:1c:17:44:76:1b:5a:98:c5:33:a2:2c:c8:
+         bf:e7:f7:2b:b7:97:37:43:8c:e7:a4:77:5f:5d:48:f6:77:2d:
+         bb:e0:f9:02:9e:df:0b:71:63:fd:ff:63:f1:23:ec:ed:bc:ac:
+         ea:a8:52:60:a7:c8:b0:f9:f7:66:62:35:ab:72:32:9a:cf:7f:
+         cc:96:fe:3b:01:31:04:21:e9:da:76:d1:09:be:66:03:c8:14:
+         48:d0:ad:73:3a:16:98:72:d9:1e:98:57:9b:49:59:8b:9a:23:
+         a9:e6:66:e6:d0:bc:65:45:fa:eb:ce:5a:21:24:9c:15:99:b9:
+         f3:63:ef:0a:bb:68:4d:ee:2e:52:6a:a2:bc:77:79:be:36:b1:
+         b5:d8:01:c5:9b:37:b0:db:38:f0:0c:59:35:7f:0c:8b:bf:ec:
+         22:bc:dc:14:c8:01:31:4f:a1:0b:82:34:ba:0f:5b:93:2e:4c:
+         ee:20:72:31:30:b1:d9:2c:42:84:2a:4e:c5:ea:d8:af:f4:da:
+         dd:b5:c4:f2:b0:43:f1:c4:09:9f:3d:5e:44:9f:b3:52:9f:92:
+         fe:9d:e3:f4:5b:6f:38:7e:3a:11:5b:99:b8:22:fd:a7:72:5d:
+         40:7c:50:f8
+-----BEGIN CERTIFICATE-----
+MIIDfzCCAmegAwIBAgIJANiZ1c4n9b5PMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RDQTAeFw0xODAyMDkwMTExMDRa
+Fw0yMTAyMDgwMTExMDRaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0
+YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMM
+BnRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMxQzbZosuBf
+vaVKXBe82LlD5iKaii4bhxO2yll+1+5Q/u+/rk3MJnC0JwNkNnPV/S4IN7ItNibI
+49Oe0zcNVvqpeFXbCbMht6zIEjUWIe2oXkqk4xGgZ65MW6cV/3Kxencr6r08iVxA
+rlhNaVbW2VBC59exWMzIKoSwFnw6gjhGeMxLituszEzhqMLUj7DZ3Hn4cCiKdk/c
+sQmiFWUz3iovjid6C5NrZkviUzOXoia/87KK8mxcQVsauxJsL/MUNcRASxZlZHIW
+v6PWG02b5hLLCsepAfhjK7ciev3vawOe5QaHHaXVEUwRrlViEfVXeyFRd464zy99
+htY4068ovI0CAwEAAaNQME4wHQYDVR0OBBYEFGJv+KKFPFx+lMw+idasT2XyLgI5
+MB8GA1UdIwQYMBaAFGJv+KKFPFx+lMw+idasT2XyLgI5MAwGA1UdEwQFMAMBAf8w
+DQYJKoZIhvcNAQELBQADggEBAAFa/7g2/wycEsytsmCsPJHBBMBrEPbgCxwXRHYb
+WpjFM6IsyL/n9yu3lzdDjOekd19dSPZ3Lbvg+QKe3wtxY/3/Y/Ej7O28rOqoUmCn
+yLD592ZiNatyMprPf8yW/jsBMQQh6dp20Qm+ZgPIFEjQrXM6Fphy2R6YV5tJWYua
+I6nmZubQvGVF+uvOWiEknBWZufNj7wq7aE3uLlJqorx3eb42sbXYAcWbN7DbOPAM
+WTV/DIu/7CK83BTIATFPoQuCNLoPW5MuTO4gcjEwsdksQoQqTsXq2K/02t21xPKw
+Q/HECZ89XkSfs1Kfkv6d4/Rbbzh+OhFbmbgi/adyXUB8UPg=
+-----END CERTIFICATE-----
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml 
b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
index 986264d..7212c39 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml
@@ -125,6 +125,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -195,6 +197,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
               <filters>
                 <filter>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index c3239cc..3fbd95a 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -81,6 +81,8 @@
                   <include>org.apache.pulsar:pulsar-checksum</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -146,6 +148,10 @@
                   <pattern>com.yahoo.sketches</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 3c26a9e..fcce6c0 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -74,6 +74,24 @@
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
     </dependency>
+    
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+         </exclusion>
+      </exclusions>
+    </dependency>
+    
+    <!-- httpclient uses it for logging --> 
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1.1</version>
+    </dependency>
 
   </dependencies>
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
index dd62728..9e4aece 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java
@@ -51,6 +51,7 @@ public class ClientConfiguration implements Serializable {
     private boolean useTls = false;
     private String tlsTrustCertsFilePath = "";
     private boolean tlsAllowInsecureConnection = false;
+    private boolean tlsHostnameVerificationEnable = false;
     private int concurrentLookupRequest = 50000;
     private int maxNumberOfRejectedRequestPerConnection = 50;
 
@@ -356,4 +357,21 @@ public class ClientConfiguration implements Serializable {
         this.maxNumberOfRejectedRequestPerConnection = 
maxNumberOfRejectedRequestPerConnection;
     }
 
+    public boolean isTlsHostnameVerificationEnable() {
+        return tlsHostnameVerificationEnable;
+    }
+
+    /**
+     * It allows to validate hostname verification when client connects to 
broker over tls. It validates incoming x509
+     * certificate and matches provided hostname(CN/SAN) with expected 
broker's host name. It follows RFC 2818, 3.1. Server
+     * Identity hostname verification.
+     * 
+     * @see <a href="https://tools.ietf.org/html/rfc2818";>rfc2818</a>
+     * 
+     * @param tlsHostnameVerificationEnable
+     */
+    public void setTlsHostnameVerificationEnable(boolean 
tlsHostnameVerificationEnable) {
+        this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
+    }
+    
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 3f2d176..38e96ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,11 +29,14 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import javax.net.ssl.SSLSession;
+
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
@@ -51,16 +54,18 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
-import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 
 public class ClientCnx extends PulsarHandler {
 
@@ -87,6 +92,10 @@ public class ClientCnx extends PulsarHandler {
     private final long operationTimeoutMs;
 
     private String proxyToTargetBrokerAddress = null;
+    // Remote hostName with which client is connected
+    private String remoteHostName = null;
+    private boolean isTlsHostnameVerificationEnable;
+    private DefaultHostnameVerifier hostnameVerifier;
 
     enum State {
         None, SentConnectFrame, Ready, Failed
@@ -100,6 +109,8 @@ public class ClientCnx extends PulsarHandler {
         this.maxNumberOfRejectedRequestPerConnection = 
conf.getMaxNumberOfRejectedRequestPerConnection();
         this.operationTimeoutMs = conf.getOperationTimeoutMs();
         this.state = State.None;
+        this.isTlsHostnameVerificationEnable = 
conf.isTlsHostnameVerificationEnable();
+        this.hostnameVerifier = new DefaultHostnameVerifier();
     }
 
     @Override
@@ -179,6 +190,14 @@ public class ClientCnx extends PulsarHandler {
 
     @Override
     protected void handleConnected(CommandConnected connected) {
+        
+        if (isTlsHostnameVerificationEnable && remoteHostName != null && 
!verifyTlsHostName(remoteHostName, ctx)) {
+            // close the connection if host-verification failed with the broker
+            log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), 
remoteHostName);
+            ctx.close();
+            return;
+        }
+        
         checkArgument(state == State.SentConnectFrame);
 
         if (log.isDebugEnabled()) {
@@ -521,6 +540,35 @@ public class ClientCnx extends PulsarHandler {
         }
     }
 
+    /**
+     * verifies host name provided in x509 Certificate in tls session
+     * 
+     * it matches hostname with below scenarios
+     * 
+     * <pre>
+     *  1. Supports IPV4 and IPV6 host matching
+     *  2. Supports wild card matching for DNS-name
+     *  eg:
+     *     HostName                     CN           Result
+     * 1.  localhost                    localhost    PASS
+     * 2.  localhost                    local*       PASS
+     * 3.  pulsar1-broker.com           pulsar*.com  PASS
+     * </pre>
+     * 
+     * @param ctx
+     * @return true if hostname is verified else return false
+     */
+    private boolean verifyTlsHostName(String hostname, ChannelHandlerContext 
ctx) {
+        ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
+
+        SSLSession sslSession = null;
+        if (sslHandler != null) {
+            sslSession = ((SslHandler) sslHandler).engine().getSession();
+            return hostnameVerifier.verify(hostname, sslSession);
+        }
+        return false;
+    }
+
     void registerConsumer(final long consumerId, final ConsumerImpl consumer) {
         consumers.put(consumerId, consumer);
     }
@@ -542,6 +590,10 @@ public class ClientCnx extends PulsarHandler {
                 targetBrokerAddress.getPort());
     }
 
+     void setRemoteHostName(String remoteHostName) {
+        this.remoteHostName = remoteHostName;
+    }
+    
     private PulsarClientException getPulsarClientException(ServerError error, 
String errorMsg) {
         switch (error) {
         case AuthenticationError:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index ed3c184..f598abe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -194,6 +194,8 @@ public class ConnectionPool implements Closeable {
                 cnx.setTargetBroker(logicalAddress);
             }
 
+            cnx.setRemoteHostName(physicalAddress.getHostName());
+            
             cnx.connectionFuture().thenRun(() -> {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Connection handshake completed", 
cnx.channel());
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
index 86ebd37..e7b0be7 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.proxy.server;
 
 import java.io.File;
+import java.security.cert.X509Certificate;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.impl.auth.AuthenticationDataTls;
 import org.apache.pulsar.common.api.PulsarDecoder;
 
 import io.netty.channel.ChannelInitializer;
@@ -58,7 +62,17 @@ public class ServiceChannelInitializer extends 
ChannelInitializer<SocketChannel>
             builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
             SslContext sslCtx = 
builder.clientAuth(ClientAuth.OPTIONAL).build();
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            
+            String certFilePath = serviceConfig.getTlsCertificateFilePath();
+            String keyFilePath = serviceConfig.getTlsKeyFilePath();
+            if (StringUtils.isNotBlank(certFilePath) && 
StringUtils.isNotBlank(keyFilePath)) {
+                AuthenticationDataTls authTlsData = new 
AuthenticationDataTls(certFilePath, keyFilePath);
+                builder.keyManager(authTlsData.getTlsPrivateKey(),
+                        (X509Certificate[]) authTlsData.getTlsCertificates());
+            }
+            
         }
+        
         ch.pipeline().addLast("frameDecoder", new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ProxyConnection(proxyService));
     }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
index 558f5e0..1619e25 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithProxyAuthorizationTest.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -42,7 +40,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
@@ -50,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
@@ -75,6 +73,11 @@ public class ProxyWithProxyAuthorizationTest extends 
ProducerConsumerBase {
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
 
+    @DataProvider(name = "hostnameVerification")
+    public Object[][] codecProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -161,7 +164,7 @@ public class ProxyWithProxyAuthorizationTest extends 
ProducerConsumerBase {
         createAdminClient();
         final String proxyServiceUrl = "pulsar://localhost:" + 
proxyConfig.getServicePortTls();
         // create a client which connects to proxy over tls and pass authData
-        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl);
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, false);
 
         String namespaceName = "my-property/proxy-authorization/my-ns";
         
@@ -205,6 +208,43 @@ public class ProxyWithProxyAuthorizationTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "hostnameVerification")
+    public void textProxyAuthorizationTlsHostVerification(boolean 
hostnameVerificationEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        createAdminClient();
+        final String proxyServiceUrl = "pulsar://localhost:" + 
proxyConfig.getServicePortTls();
+        // create a client which connects to proxy over tls and pass authData
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
hostnameVerificationEnabled);
+
+        String namespaceName = "my-property/proxy-authorization/my-ns";
+
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("proxy-authorization")));
+        admin.namespaces().createNamespace(namespaceName);
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        try {
+            Consumer consumer = 
proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
+                    "my-subscriber-name", conf);
+            if (hostnameVerificationEnabled) {
+                Assert.fail("Connection should be failed due to 
hostnameVerification enabled");
+            }
+        } catch (PulsarClientException e) {
+            if (!hostnameVerificationEnabled) {
+                Assert.fail("Consumer should be created because 
hostnameverification is disabled");
+            }
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
     protected final void createAdminClient() throws Exception {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_SUPERUSER_CLIENT_CERT_FILE_PATH);
@@ -221,7 +261,7 @@ public class ProxyWithProxyAuthorizationTest extends 
ProducerConsumerBase {
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
     }
     
-    private PulsarClient createPulsarClient(String proxyServiceUrl) throws 
PulsarClientException {
+    private PulsarClient createPulsarClient(String proxyServiceUrl, boolean 
hosnameVerificationEnabled) throws PulsarClientException {
         Map<String, String> authParams = Maps.newHashMap();
         authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
         authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
@@ -233,6 +273,7 @@ public class ProxyWithProxyAuthorizationTest extends 
ProducerConsumerBase {
         clientConf.setTlsAllowInsecureConnection(true);
         clientConf.setAuthentication(authTls);
         clientConf.setUseTls(true);
+        
clientConf.setTlsHostnameVerificationEnable(hosnameVerificationEnabled);
         return PulsarClient.create(proxyServiceUrl, clientConf);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
rdhaba...@apache.org.

Reply via email to