This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b557e24  PIP-91: Separate lookup timeout from operation timeout 
(#11627)
b557e24 is described below

commit b557e2479c70631fa0dc34606bc5492f73cfef96
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Aug 17 11:11:23 2021 +0100

    PIP-91: Separate lookup timeout from operation timeout (#11627)
    
    * PIP-91: Separate lookup timeout from operation timeout
    
    This patch contains a number of changes.
    
    TooManyRequests is retried for partition metadata and lookups
    
    Lookup timeout configuration has been added. By default it matches
    operation timeout.
    
    Partition metadata timeout calculation has been fixed to calculate
    the elapsed time correctly.
    
    Small refactor on broker construction to allow a mocked ServerCnx
    implementation for testing. Unfortunately, the test takes over 50
    seconds, but this is unavoidable due to the fact that we're working
    with timeouts here.
    
    PulsarClientExceptions have been reworked to contain more
    context (remote/local/reqid) and any previous exceptions which may
    have occurred triggering retries. The previous exceptions must be
    manually recorded, so this only applies to lookups on the consumer
    side for now.
    
    * Fixup for test failures
    
    BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was
    depending on the fact that TooManyRequests was previously fatal for
    partition metadata request. Now that it retries, that test was
    failing. It's a bad test anyhow, depending on thread interactions and
    whatnot. I've rewritten it to use the ServerCnx mock. It now actually
    tests for the thing it should, that clients close the connection after
    the max rejects.
    
    The schema tests were failing because they expected a certain
    exception message which has been extended. I changes endsWith to
    contains.
    
    I also added Producer retries similiar to the Consumer ones. I was
    going to do as a followon PR, but decided to put in this one.
    
    Co-authored-by: Ivan Kelly <ike...@splunk.com>
---
 .../org/apache/pulsar/broker/PulsarService.java    |   9 +-
 .../pulsar/broker/service/BrokerService.java       |  13 +-
 .../broker/service/PulsarChannelInitializer.java   |  15 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   7 +-
 .../loadbalance/LeaderElectionServiceTest.java     |   4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   | 140 +++++++---
 .../service/BrokerServiceThrottlingTest.java       | 108 ++++++--
 .../pulsar/client/api/MockBrokerService.java       |  12 +
 .../client/impl/BrokerClientIntegrationTest.java   |  70 +----
 .../apache/pulsar/client/impl/LookupRetryTest.java | 287 +++++++++++++++++++++
 .../SchemaTypeCompatibilityCheckTest.java          |  16 +-
 .../apache/pulsar/client/api/ClientBuilder.java    |  18 ++
 .../pulsar/client/api/PulsarClientException.java   | 149 ++++++++---
 .../pulsar/client/impl/ClientBuilderImpl.java      |   6 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  37 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  46 +++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  45 +++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  15 +-
 .../client/impl/conf/ClientConfigurationData.java  |  14 +
 .../apache/pulsar/common/protocol/Commands.java    |   2 +-
 20 files changed, 801 insertions(+), 212 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 685cf05..0c0cec7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
 import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable {
     private ProtocolHandlers protocolHandlers = null;
 
     private final ShutdownService shutdownService;
-    private final EventLoopGroup ioEventLoopGroup;
+    protected final EventLoopGroup ioEventLoopGroup;
 
     private MetricsGenerator metricsGenerator;
 
@@ -658,7 +659,7 @@ public class PulsarService implements AutoCloseable {
                 config, localMetadataStore, getZkClient(), bkClientFactory, 
ioEventLoopGroup
             );
 
-            this.brokerService = new BrokerService(this, ioEventLoopGroup);
+            this.brokerService = newBrokerService(this);
 
             // Start load management service (even if load balancing is 
disabled)
             this.loadManager.set(LoadManager.create(this));
@@ -1678,4 +1679,8 @@ public class PulsarService implements AutoCloseable {
                 || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
     }
 
+    @VisibleForTesting
+    protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
+        return new BrokerService(pulsar, ioEventLoopGroup);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4a2cad9..c0cd311 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
@@ -254,6 +255,7 @@ public class BrokerService implements Closeable {
     @Getter
     private final BundlesQuotas bundlesQuotas;
 
+    private PulsarChannelInitializer.Factory pulsarChannelInitFactory = 
PulsarChannelInitializer.DEFAULT_FACTORY;
     private Channel listenChannel;
     private Channel listenChannelTls;
 
@@ -410,7 +412,8 @@ public class BrokerService implements Closeable {
 
         ServiceConfiguration serviceConfig = pulsar.getConfiguration();
 
-        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
+        bootstrap.childHandler(
+                pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, 
false));
 
         Optional<Integer> port = serviceConfig.getBrokerServicePort();
         if (port.isPresent()) {
@@ -427,7 +430,8 @@ public class BrokerService implements Closeable {
         Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
         if (tlsPort.isPresent()) {
             ServerBootstrap tlsBootstrap = bootstrap.clone();
-            tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, 
true));
+            tlsBootstrap.childHandler(
+                    
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
             try {
                 listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
                         pulsar.getBindAddress(), tlsPort.get())).sync()
@@ -2647,4 +2651,9 @@ public class BrokerService implements Closeable {
     public long getPausedConnections() {
         return pausedConnections.longValue();
     }
+
+    @VisibleForTesting
+    public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
+        this.pulsarChannelInitFactory = factory;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index f5f2be9..1ebd5b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -129,7 +130,7 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         // ServerCnx ends up reading higher number of messages and broker can 
not throttle the messages by disabling
         // auto-read.
         ch.pipeline().addLast("flowController", new FlowControlHandler());
-        ServerCnx cnx = new ServerCnx(pulsar);
+        ServerCnx cnx = newServerCnx(pulsar);
         ch.pipeline().addLast("handler", cnx);
 
         connections.put(ch.remoteAddress(), cnx);
@@ -144,4 +145,16 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
             }
         });
     }
+
+    @VisibleForTesting
+    protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
+        return new ServerCnx(pulsar);
+    }
+
+    public interface Factory {
+        PulsarChannelInitializer newPulsarChannelInitializer(PulsarService 
pulsar, boolean enableTLS) throws Exception;
+    }
+
+    public static final Factory DEFAULT_FACTORY =
+        (pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index f14c6e1..37a7f3a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -280,13 +280,12 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
     }
 
     protected PulsarService startBroker(ServiceConfiguration conf) throws 
Exception {
-
         return startBrokerWithoutAuthorization(conf);
     }
 
     protected PulsarService 
startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
         conf.setBrokerShutdownTimeoutMs(0L);
-        PulsarService pulsar = spy(new PulsarService(conf));
+        PulsarService pulsar = spy(newPulsarService(conf));
         setupBrokerMocks(pulsar);
         beforePulsarStartMocks(pulsar);
         pulsar.start();
@@ -295,6 +294,10 @@ public abstract class MockedPulsarServiceBaseTest extends 
TestRetrySupport {
         return pulsar;
     }
 
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws 
Exception {
+        return new PulsarService(conf);
+    }
+
     protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
         // Override default providers with mocked ones
         
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index faf5125..d252b1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -129,7 +129,9 @@ public class LeaderElectionServiceTest {
                     .create();
         } catch (PulsarClientException t) {
             Assert.assertTrue(t instanceof 
PulsarClientException.LookupException);
-            Assert.assertEquals(t.getMessage(), 
"java.lang.IllegalStateException: The leader election has not yet been 
completed!");
+            Assert.assertTrue(
+                    t.getMessage().contains(
+                            "java.lang.IllegalStateException: The leader 
election has not yet been completed!"));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index ea30d17..9fee5e6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -33,6 +33,9 @@ import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -79,13 +82,18 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -879,42 +887,114 @@ public class BrokerServiceTest extends BrokerTestBase {
      */
     @Test
     public void testLookupThrottlingForClientByClient() throws Exception {
+        // This test looks like it could be flakey, if the broker responds
+        // quickly enough, there may never be concurrency in requests
         final String topicName = "persistent://prop/ns-abc/newTopic";
 
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .maxConcurrentLookupRequests(1)
-                .maxLookupRequests(2)
-                .build();
+        PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
+        resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setConcurrentLookupRequest(1);
+        conf.setMaxLookupRequest(2);
+
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
+                new DefaultThreadFactory("test-pool", 
Thread.currentThread().isDaemon()));
+        long reqId = 0xdeadbeef;
+        try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
+            // for PMR
+            // 2 lookup will succeed
+            long reqId1 = reqId++;
+            ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, 
reqId1);
+            CompletableFuture<?> f1 = 
pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request1, 
reqId1));
+
+            long reqId2 = reqId++;
+            ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, 
reqId2);
+            CompletableFuture<?> f2 = 
pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request2, 
reqId2));
+
+            f1.get();
+            f2.get();
+
+            // 3 lookup will fail
+            long reqId3 = reqId++;
+            ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, 
reqId3);
+            f1 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request3, 
reqId3));
+
+            long reqId4 = reqId++;
+            ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, 
reqId4);
+            f2 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request4, 
reqId4));
+
+            long reqId5 = reqId++;
+            ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, 
reqId5);
+            CompletableFuture<?> f3 = 
pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request5, 
reqId5));
 
-        // 2 lookup will success.
-        try {
-            CompletableFuture<Consumer<byte[]>> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
-            CompletableFuture<Consumer<byte[]>> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
+            try {
+                f1.get();
+                f2.get();
+                f3.get();
+                fail("At least one should fail");
+            } catch (ExecutionException e) {
+                Throwable rootCause = e;
+                while (rootCause instanceof ExecutionException) {
+                    rootCause = rootCause.getCause();
+                }
+                if (!(rootCause instanceof
+                      
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
+                    throw e;
+                }
+            }
 
-            consumer1.get().close();
-            consumer2.get().close();
-        } catch (Exception e) {
-            fail("Subscribe should success with 2 requests");
-        }
+            // for Lookup
+            // 2 lookup will succeed
+            long reqId6 = reqId++;
+            ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
+            f1 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request6, 
reqId6));
+
+            long reqId7 = reqId++;
+            ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
+            f2 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request7, 
reqId7));
+
+            f1.get();
+            f2.get();
+
+            // 3 lookup will fail
+            long reqId8 = reqId++;
+            ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
+            f1 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request8, 
reqId8));
+
+            long reqId9 = reqId++;
+            ByteBuf request9 = Commands.newLookup(topicName, true, reqId9);
+            f2 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request9, 
reqId9));
+
+            long reqId10 = reqId++;
+            ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
+            f3 = pool.getConnection(resolver.resolveHost())
+                .thenCompose(clientCnx -> clientCnx.newLookup(request10, 
reqId10));
 
-        // 3 lookup will fail
-        try {
-            CompletableFuture<Consumer<byte[]>> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
-            CompletableFuture<Consumer<byte[]>> consumer2 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
-            CompletableFuture<Consumer<byte[]>> consumer3 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();
-
-            consumer1.get().close();
-            consumer2.get().close();
-            consumer3.get().close();
-            fail("It should fail as throttling should only receive 2 
requests");
-        } catch (Exception e) {
-            if (!(e.getCause() instanceof
-                
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
-                fail("Subscribe should fail with TooManyRequestsException");
+            try {
+                f1.get();
+                f2.get();
+                f3.get();
+                fail("At least one should fail");
+            } catch (ExecutionException e) {
+                Throwable rootCause = e;
+                while (rootCause instanceof ExecutionException) {
+                    rootCause = rootCause.getCause();
+                }
+                if (!(rootCause instanceof
+                      
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
+                    throw e;
+                }
             }
+
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
index 37436d7..16e3130 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java
@@ -21,27 +21,37 @@ package org.apache.pulsar.broker.service;
 import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import lombok.Cleanup;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.testng.annotations.AfterMethod;
@@ -148,32 +158,86 @@ public class BrokerServiceThrottlingTest extends 
BrokerTestBase {
             }
         }
 
-        List<Consumer<byte[]>> successfulConsumers = 
Collections.synchronizedList(Lists.newArrayList());
-        @Cleanup("shutdownNow")
+        PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
+        resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setConnectionsPerBroker(20);
+
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
+                new DefaultThreadFactory("test-pool", 
Thread.currentThread().isDaemon()));
         ExecutorService executor = Executors.newFixedThreadPool(10);
-        final int totalConsumers = 20;
-        CountDownLatch latch = new CountDownLatch(totalConsumers);
-        for (int i = 0; i < totalConsumers; i++) {
-            executor.execute(() -> {
+        try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
+            final int totalConsumers = 20;
+            List<Future<?>> futures = new ArrayList<>();
+
+            // test for partitionMetadataRequest
+            for (int i = 0; i < totalConsumers; i++) {
+                long reqId = 0xdeadbeef + i;
+                Future<?> f = executor.submit(() -> {
+                        ByteBuf request = 
Commands.newPartitionMetadataRequest(topicName, reqId);
+                        pool.getConnection(resolver.resolveHost())
+                            .thenCompose(clientCnx -> 
clientCnx.newLookup(request, reqId))
+                            .get();
+                        return null;
+                    });
+                futures.add(f);
+            }
+
+            int rejects = 0;
+            for (Future<?> f : futures) {
                 try {
-                    
successfulConsumers.add(pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub")
-                            
.subscriptionType(SubscriptionType.Shared).subscribe());
-                } catch (PulsarClientException.TooManyRequestsException e) {
-                    // ok
-                } catch (Exception e) {
-                    fail("it shouldn't failed");
+                    f.get();
+                } catch (ExecutionException e) {
+                    Throwable rootCause = e;
+                    while (rootCause instanceof ExecutionException) {
+                        rootCause = rootCause.getCause();
+                    }
+                    if (rootCause instanceof
+                        
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) {
+                        rejects++;
+                    } else {
+                        throw e;
+                    }
                 }
-                latch.countDown();
-            });
-        }
-        latch.await();
+            }
+            assertTrue(rejects > 0);
+            futures.clear();
+
+            // test for lookup
+            for (int i = 0; i < totalConsumers; i++) {
+                long reqId = 0xdeadfeef + i;
+                Future<?> f = executor.submit(() -> {
+                        ByteBuf request = Commands.newLookup(topicName, true, 
reqId);
+                        pool.getConnection(resolver.resolveHost())
+                            .thenCompose(clientCnx -> 
clientCnx.newLookup(request, reqId))
+                            .get();
+                        return null;
+                    });
+                futures.add(f);
+            }
 
-        for (Consumer<?> c : successfulConsumers) {
-            if (c != null) {
-                c.close();
+            rejects = 0;
+            for (Future<?> f : futures) {
+                try {
+                    f.get();
+                } catch (ExecutionException e) {
+                    Throwable rootCause = e;
+                    while (rootCause instanceof ExecutionException) {
+                        rootCause = rootCause.getCause();
+                    }
+                    if (rootCause instanceof
+                        
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) {
+                        rejects++;
+                    } else {
+                        throw e;
+                    }
+                }
             }
+            assertTrue(rejects > 0);
+        } finally {
+            executor.shutdownNow();
+            eventLoop.shutdownNow();
         }
-        assertNotEquals(successfulConsumers.size(), totalConsumers);
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index 33094ed..b16fac4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -58,6 +58,8 @@ import org.apache.pulsar.common.api.proto.CommandFlow;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
+import org.apache.pulsar.common.api.proto.CommandPing;
+import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.CommandProducer;
 import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -247,6 +249,16 @@ public class MockBrokerService {
             log.warn("Got exception", cause);
             ctx.close();
         }
+
+        @Override
+        final protected void handlePing(CommandPing ping) {
+            // Immediately reply success to ping requests
+            ctx.writeAndFlush(Commands.newPong());
+        }
+
+        @Override
+        final protected void handlePong(CommandPong pong) {
+        }
     }
 
     private final Server server;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index bcfd514..fb3c30b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -528,74 +528,6 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
     }
 
     /**
-     * <pre>
-     * Verifies: that client-cnx gets closed when server gives 
TooManyRequestException in certain time frame
-     * 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0
-     * 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100
-     * 3. create multiple producer and make lookup-requests simultaneously
-     * 4. Client1 receives TooManyLookupException and should close connection
-     * </pre>
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
-
-        final String topicName = "persistent://prop/usw/my-ns/newTopic";
-        final int maxConccurentLookupRequest = 
pulsar.getConfiguration().getMaxConcurrentLookupRequest();
-        final int concurrentLookupRequests = 20;
-        @Cleanup("shutdownNow")
-        ExecutorService executor = 
Executors.newFixedThreadPool(concurrentLookupRequests);
-        try {
-            stopBroker();
-            conf.setMaxConcurrentLookupRequest(1);
-            startBroker();
-            String lookupUrl = pulsar.getBrokerServiceUrl();
-
-            @Cleanup
-            PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
-                    .maxNumberOfRejectedRequestPerConnection(0).build();
-
-            @Cleanup
-            PulsarClient pulsarClient2 = 
PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS)
-                    
.ioThreads(concurrentLookupRequests).connectionsPerBroker(20).build();
-
-            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
-            ClientCnx cnx = producer.cnx();
-            assertTrue(cnx.channel().isActive());
-
-            final int totalProducer = 100;
-            CountDownLatch latch = new CountDownLatch(totalProducer * 2);
-            AtomicInteger failed = new AtomicInteger(0);
-            for (int i = 0; i < totalProducer; i++) {
-                executor.submit(() -> {
-                    
pulsarClient2.newProducer().topic(topicName).createAsync().handle((ok, e) -> {
-                        if (e != null) {
-                            failed.set(1);
-                        }
-                        latch.countDown();
-                        return null;
-                    });
-                    
pulsarClient.newProducer().topic(topicName).createAsync().handle((ok, e) -> {
-                        if (e != null) {
-                            failed.set(1);
-                        }
-                        latch.countDown();
-                        return null;
-                    });
-                });
-
-            }
-
-            latch.await(10, TimeUnit.SECONDS);
-            // connection must be closed
-            assertEquals(failed.get(), 1);
-        } finally {
-            conf.setMaxConcurrentLookupRequest(maxConccurentLookupRequest);
-        }
-    }
-
-    /**
      * It verifies that broker throttles down configured concurrent topic 
loading requests
      *
      * <pre>
@@ -1021,4 +953,4 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
new file mode 100644
index 0000000..ee44a39
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
+import static 
org.apache.pulsar.common.protocol.Commands.newPartitionMetadataResponse;
+
+import com.google.common.collect.Sets;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarChannelInitializer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.common.api.proto.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class LookupRetryTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = 
LoggerFactory.getLogger(LookupRetryTest.class);
+    private static final String subscription = "reader-sub";
+    private final AtomicInteger connectionsCreated = new AtomicInteger(0);
+    private final ConcurrentHashMap<String, Queue<LookupError>> failureMap = 
new ConcurrentHashMap<>();
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default", 
Sets.newHashSet("test"));
+    }
+
+    @Override
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws 
Exception {
+        return new PulsarService(conf) {
+            @Override
+            protected BrokerService newBrokerService(PulsarService pulsar) 
throws Exception {
+                BrokerService broker = new BrokerService(this, 
ioEventLoopGroup);
+                broker.setPulsarChannelInitializerFactory(
+                        (_pulsar, tls) -> {
+                            return new PulsarChannelInitializer(_pulsar, tls) {
+                                @Override
+                                protected ServerCnx newServerCnx(PulsarService 
pulsar) throws Exception {
+                                    connectionsCreated.incrementAndGet();
+                                    return new ErrorByTopicServerCnx(pulsar, 
failureMap);
+                                }
+                            };
+                        });
+                return broker;
+            }
+        };
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    PulsarClient newClient() throws Exception {
+        return PulsarClient.builder()
+            .serviceUrl(pulsar.getBrokerServiceUrl())
+            .connectionTimeout(2, TimeUnit.SECONDS)
+            .operationTimeout(1, TimeUnit.SECONDS)
+            .lookupTimeout(10, TimeUnit.SECONDS)
+            .build();
+    }
+
+    @Test
+    public void testGetPartitionedMetadataRetries() throws Exception {
+        try (PulsarClient client = newClient()) {
+            client.getPartitionsForTopic("TIMEOUT:2,OK:10").get();
+        }
+        try (PulsarClient client = newClient()) {
+            client.getPartitionsForTopic("TOO_MANY:2,OK:10").get();
+        }
+    }
+
+    @Test
+    public void testTimeoutRetriesOnPartitionMetadata() throws Exception {
+        try (PulsarClient client = newClient();
+             Reader<byte[]> reader = client
+                
.newReader().topic("TIMEOUT:2,OK:3").startMessageId(MessageId.latest)
+                .startMessageIdInclusive().readerName(subscription).create()) {
+        }
+    }
+
+    @Test
+    public void testTooManyRetriesOnPartitionMetadata() throws Exception {
+        try (PulsarClient client = newClient();
+             Reader<byte[]> reader = client
+                
.newReader().topic("TOO_MANY:2,OK:3").startMessageId(MessageId.latest)
+                .startMessageIdInclusive().readerName(subscription).create()) {
+        }
+    }
+
+    @Test
+    public void testTooManyOnLookup() throws Exception {
+        try (PulsarClient client = newClient();
+             Reader<byte[]> reader = client
+                
.newReader().topic("OK:1,TOO_MANY:2,OK:3").startMessageId(MessageId.latest)
+                .startMessageIdInclusive().readerName(subscription).create()) {
+        }
+    }
+
+    @Test
+    public void testTimeoutOnLookup() throws Exception {
+        try (PulsarClient client = newClient();
+             Reader<byte[]> reader = client
+                
.newReader().topic("OK:1,TIMEOUT:2,OK:3").startMessageId(MessageId.latest)
+                .startMessageIdInclusive().readerName(subscription).create()) {
+        }
+    }
+
+    @Test
+    public void testManyFailures() throws Exception {
+        try (PulsarClient client = newClient();
+             Reader<byte[]> reader = client
+                
.newReader().topic("TOO_MANY:1,TIMEOUT:1,OK:1,TIMEOUT:1,TOO_MANY:1,OK:3")
+                .startMessageId(MessageId.latest)
+                .startMessageIdInclusive().readerName(subscription).create()) {
+        }
+    }
+
+    @Test
+    public void testProducerTimeoutOnPMR() throws Exception {
+        try (PulsarClient client = newClient();
+             Producer<byte[]> producer = 
client.newProducer().topic("TIMEOUT:2,OK:3").create()) {
+        }
+    }
+
+    @Test
+    public void testProducerTooManyOnPMR() throws Exception {
+        try (PulsarClient client = newClient();
+             Producer<byte[]> producer = 
client.newProducer().topic("TOO_MANY:2,OK:3").create()) {
+        }
+
+    }
+
+    @Test
+    public void testProducerTimeoutOnLookup() throws Exception {
+        try (PulsarClient client = newClient();
+             Producer<byte[]> producer = 
client.newProducer().topic("OK:1,TIMEOUT:2,OK:3").create()) {
+        }
+    }
+
+    @Test
+    public void testProducerTooManyOnLookup() throws Exception {
+        try (PulsarClient client = newClient();
+             Producer<byte[]> producer = 
client.newProducer().topic("OK:1,TOO_MANY:2,OK:3").create()) {
+        }
+    }
+
+    /**
+     * <pre>
+     * Verifies: that client-cnx gets closed when server gives 
TooManyRequestException in certain time frame
+     * Client1: which has set MaxNumberOfRejectedRequestPerConnection=1, 
should fail on TooManyRequests
+     * Client2: which has set MaxNumberOfRejectedRequestPerConnection=100, 
should not fail
+     * on TooManyRequests, whether there is 1 or 4 (I don't do more because 
exponential
+     * backoff would make it take a long time.
+     * </pre>
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {
+        String lookupUrl = pulsar.getBrokerServiceUrl();
+        try (PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl)
+                .maxNumberOfRejectedRequestPerConnection(1).build()) {
+
+            // need 2 TooManyRequests because it takes the count before 
incrementing
+            pulsarClient.newProducer().topic("TOO_MANY:2").create().close();
+
+            Assert.assertEquals(connectionsCreated.get(), 2);
+        }
+
+        try (PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl)
+             .maxNumberOfRejectedRequestPerConnection(100).build()) {
+
+            pulsarClient.newProducer().topic("TOO_MANY:2").create().close();
+            pulsarClient.newProducer().topic("TOO_MANY:4").create().close();
+            Assert.assertEquals(connectionsCreated.get(), 3);
+        }
+    }
+
+    enum LookupError {
+        UNKNOWN,
+        TOO_MANY,
+        TIMEOUT,
+        OK,
+    }
+
+    private static class ErrorByTopicServerCnx extends ServerCnx {
+        private final ConcurrentHashMap<String, Queue<LookupError>> failureMap;
+
+        ErrorByTopicServerCnx(PulsarService pulsar, ConcurrentHashMap<String, 
Queue<LookupError>> failureMap) {
+            super(pulsar);
+            this.failureMap = failureMap;
+        }
+
+        private Queue<LookupError> errorList(String topicName) {
+            return failureMap.compute(
+                    topicName,
+                    (k, v) -> {
+                        if (v == null) {
+                            v = new ArrayBlockingQueue<LookupError>(100);
+                            for (String e : k.split(",")) {
+                                String[] parts = e.split(":");
+                                LookupError error = 
Enum.valueOf(LookupError.class, parts[0]);
+                                for (int i = 0; i < 
Integer.parseInt(parts[1]); i++) {
+                                    v.add(error);
+                                }
+                            }
+                        }
+                        return v;
+                    });
+        }
+
+        @Override
+        protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadata) {
+            TopicName t = TopicName.get(partitionMetadata.getTopic());
+            LookupError error = errorList(t.getLocalName()).poll();
+            if (error == LookupError.TOO_MANY) {
+                final long requestId = partitionMetadata.getRequestId();
+                
ctx.writeAndFlush(newPartitionMetadataResponse(ServerError.TooManyRequests, 
"too many", requestId));
+            } else if (error == LookupError.TIMEOUT) {
+                // do nothing
+            } else if (error == null || error == LookupError.OK) {
+                super.handlePartitionMetadataRequest(partitionMetadata);
+            }
+        }
+
+        @Override
+        protected void handleLookup(CommandLookupTopic lookup) {
+            TopicName t = TopicName.get(lookup.getTopic());
+            LookupError error = errorList(t.getLocalName()).poll();
+            if (error == LookupError.TOO_MANY) {
+                final long requestId = lookup.getRequestId();
+                
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, "too 
many", requestId));
+            } else if (error == LookupError.TIMEOUT) {
+                // do nothing
+            } else if (error == null || error == LookupError.OK) {
+                super.handleLookup(lookup);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
index 9b9c600..7fa08ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -97,7 +97,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .topic(topicName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
     }
 
     @Test
@@ -123,7 +123,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .subscriptionName(subName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
     }
 
     @Test
@@ -149,7 +149,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                     .topic(topicName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
     }
 
     @Test
@@ -178,7 +178,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .subscriptionName(subName + "2");
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
     }
 
     @Test
@@ -200,7 +200,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .topic(topicName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
     }
 
     @Test
@@ -226,7 +226,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .subscriptionName(subName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
     }
 
     @Test
@@ -252,7 +252,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .topic(topicName);
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
     }
 
     @Test
@@ -281,7 +281,7 @@ public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTes
                 .subscriptionName(subName + "2");
 
         Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
-        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+        assertTrue(t.getMessage().contains("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
     }
 
     @Test
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 2605afd..5840340 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -218,6 +218,24 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
 
     /**
+     * Set lookup timeout <i>(default: matches operation timeout)</i>
+     *
+     * Lookup operations have a different load pattern to other operations. 
They can be handled by any broker, are not
+     * proportional to throughput, and are harmless to retry. Given this, it 
makes sense to allow them to retry longer
+     * than normal operation, especially if they experience a timeout.
+     *
+     * By default this is set to match operation timeout. This is to maintain 
legacy behaviour. However, in practice
+     * it should be set to 5-10x the operation timeout.
+     *
+     * @param lookupTimeout
+     *            lookup timeout
+     * @param unit
+     *            time unit for {@code lookupTimeout}
+     * @return the client builder instance
+     */
+    ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit);
+
+    /**
      * Set the number of threads to be used for handling connections to 
brokers <i>(default: 1 thread)</i>.
      *
      * @param numIoThreads the number of IO threads
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 8654203..5c0211f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.common.classification.InterfaceAudience;
@@ -32,6 +33,8 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
 @SuppressWarnings("serial")
 public class PulsarClientException extends IOException {
     private long sequenceId = -1;
+    private Collection<Throwable> previous;
+
     /**
      * Constructs an {@code PulsarClientException} with the specified detail 
message.
      *
@@ -84,6 +87,49 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Add a list of previous exception which occurred for the same operation
+     * and have been retried.
+     *
+     * @param previous A collection of throwables that triggered retries
+     */
+    public void setPreviousExceptions(Collection<Throwable> previous) {
+        this.previous = previous;
+    }
+
+    /**
+     * Get the collection of previous exceptions which have caused retries
+     * for this operation.
+     *
+     * @return a collection of exception, ordered as they occurred
+     */
+    public Collection<Throwable> getPreviousExceptions() {
+        return this.previous;
+    }
+
+    @Override
+    public String toString() {
+        if (previous == null || previous.isEmpty()) {
+            return super.toString();
+        } else {
+            StringBuilder sb = new StringBuilder(super.toString());
+            int i = 0;
+            boolean first = true;
+            sb.append("{\"previous\":[");
+            for (Throwable t : previous) {
+                if (first) {
+                    first = false;
+                } else {
+                    sb.append(',');
+                }
+                sb.append("{\"attempt\":").append(i++)
+                    .append(",\"error\":\"").append(t.toString().replace("\"", 
"\\\""))
+                    .append("\"}");
+            }
+            sb.append("]}");
+            return sb.toString();
+        }
+    }
+    /**
      * Constructs an {@code PulsarClientException} with the specified cause.
      *
      * @param t
@@ -957,77 +1003,110 @@ public class PulsarClientException extends IOException {
         // site
         Throwable cause = t.getCause();
         String msg = cause.getMessage();
+        PulsarClientException newException = null;
         if (cause instanceof TimeoutException) {
-            return new TimeoutException(msg);
+            newException = new TimeoutException(msg);
         } else if (cause instanceof InvalidConfigurationException) {
-            return new InvalidConfigurationException(msg);
+            newException = new InvalidConfigurationException(msg);
         } else if (cause instanceof AuthenticationException) {
-            return new AuthenticationException(msg);
+            newException = new AuthenticationException(msg);
         } else if (cause instanceof IncompatibleSchemaException) {
-            return new IncompatibleSchemaException(msg);
+            newException = new IncompatibleSchemaException(msg);
         } else if (cause instanceof TooManyRequestsException) {
-            return new TooManyRequestsException(msg);
+            newException = new TooManyRequestsException(msg);
         } else if (cause instanceof LookupException) {
-            return new LookupException(msg);
+            newException = new LookupException(msg);
         } else if (cause instanceof ConnectException) {
-            return new ConnectException(msg);
+            newException = new ConnectException(msg);
         } else if (cause instanceof AlreadyClosedException) {
-            return new AlreadyClosedException(msg);
+            newException = new AlreadyClosedException(msg);
         } else if (cause instanceof TopicTerminatedException) {
-            return new TopicTerminatedException(msg);
+            newException = new TopicTerminatedException(msg);
         } else if (cause instanceof AuthorizationException) {
-            return new AuthorizationException(msg);
+            newException = new AuthorizationException(msg);
         } else if (cause instanceof GettingAuthenticationDataException) {
-            return new GettingAuthenticationDataException(msg);
+            newException = new GettingAuthenticationDataException(msg);
         } else if (cause instanceof UnsupportedAuthenticationException) {
-            return new UnsupportedAuthenticationException(msg);
+            newException = new UnsupportedAuthenticationException(msg);
         } else if (cause instanceof BrokerPersistenceException) {
-            return new BrokerPersistenceException(msg);
+            newException = new BrokerPersistenceException(msg);
         } else if (cause instanceof BrokerMetadataException) {
-            return new BrokerMetadataException(msg);
+            newException = new BrokerMetadataException(msg);
         } else if (cause instanceof ProducerBusyException) {
-            return new ProducerBusyException(msg);
+            newException = new ProducerBusyException(msg);
         } else if (cause instanceof ConsumerBusyException) {
-            return new ConsumerBusyException(msg);
+            newException = new ConsumerBusyException(msg);
         } else if (cause instanceof NotConnectedException) {
-            return new NotConnectedException();
+            newException = new NotConnectedException();
         } else if (cause instanceof InvalidMessageException) {
-            return new InvalidMessageException(msg);
+            newException = new InvalidMessageException(msg);
         } else if (cause instanceof InvalidTopicNameException) {
-            return new InvalidTopicNameException(msg);
+            newException = new InvalidTopicNameException(msg);
         } else if (cause instanceof NotSupportedException) {
-            return new NotSupportedException(msg);
+            newException = new NotSupportedException(msg);
         } else if (cause instanceof NotAllowedException) {
-            return new NotAllowedException(msg);
+            newException = new NotAllowedException(msg);
         } else if (cause instanceof ProducerQueueIsFullError) {
-            return new ProducerQueueIsFullError(msg);
+            newException = new ProducerQueueIsFullError(msg);
         } else if (cause instanceof ProducerBlockedQuotaExceededError) {
-            return new ProducerBlockedQuotaExceededError(msg);
+            newException = new ProducerBlockedQuotaExceededError(msg);
         } else if (cause instanceof ProducerBlockedQuotaExceededException) {
-            return new ProducerBlockedQuotaExceededException(msg);
+            newException = new ProducerBlockedQuotaExceededException(msg);
         } else if (cause instanceof ChecksumException) {
-            return new ChecksumException(msg);
+            newException = new ChecksumException(msg);
         } else if (cause instanceof CryptoException) {
-            return new CryptoException(msg);
+            newException = new CryptoException(msg);
         } else if (cause instanceof ConsumerAssignException) {
-            return new ConsumerAssignException(msg);
+            newException = new ConsumerAssignException(msg);
         } else if (cause instanceof MessageAcknowledgeException) {
-            return new MessageAcknowledgeException(msg);
+            newException = new MessageAcknowledgeException(msg);
         } else if (cause instanceof TransactionConflictException) {
-            return new TransactionConflictException(msg);
+            newException = new TransactionConflictException(msg);
         } else if (cause instanceof TopicDoesNotExistException) {
-            return new TopicDoesNotExistException(msg);
+            newException = new TopicDoesNotExistException(msg);
         } else if (cause instanceof ProducerFencedException) {
-            return new ProducerFencedException(msg);
+            newException = new ProducerFencedException(msg);
         } else if (cause instanceof MemoryBufferIsFullError) {
-            return new MemoryBufferIsFullError(msg);
+            newException = new MemoryBufferIsFullError(msg);
         } else if (cause instanceof NotFoundException) {
-            return new NotFoundException(msg);
+            newException = new NotFoundException(msg);
         } else {
-            return new PulsarClientException(t);
+            newException = new PulsarClientException(t);
         }
+
+        Collection<Throwable> previousExceptions = getPreviousExceptions(t);
+        if (previousExceptions != null) {
+            newException.setPreviousExceptions(previousExceptions);
+        }
+        return newException;
     }
 
+    public static Collection<Throwable> getPreviousExceptions(Throwable t) {
+        Throwable e = t;
+        for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) {
+            if (e instanceof PulsarClientException) {
+                Collection<Throwable> previous = 
((PulsarClientException)e).getPreviousExceptions();
+                if (previous != null) {
+                    return previous;
+                }
+            }
+            e = t.getCause();
+        }
+        return null;
+    }
+
+    public static void setPreviousExceptions(Throwable t, 
Collection<Throwable> previous) {
+        Throwable e = t;
+        for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) {
+            if (e instanceof PulsarClientException) {
+                ((PulsarClientException)e).setPreviousExceptions(previous);
+                return;
+            }
+            e = t.getCause();
+        }
+    }
+
+
     public long getSequenceId() {
         return sequenceId;
     }
@@ -1059,4 +1138,4 @@ public class PulsarClientException extends IOException {
         }
         return true;
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index d7fa818..c8a4376 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -145,6 +145,12 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {
+        conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout));
+        return this;
+    }
+
+    @Override
     public ClientBuilder ioThreads(int numIoThreads) {
         conf.setNumIoThreads(numIoThreads);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index c36a7a6..23f06cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -140,6 +140,8 @@ public class ClientCnx extends PulsarHandler {
     private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new 
TlsHostnameVerifier();
 
     private ScheduledFuture<?> timeoutTask;
+    private SocketAddress localAddress;
+    private SocketAddress remoteAddress;
 
     // Added for mutual authentication.
     @Getter
@@ -208,6 +210,9 @@ public class ClientCnx extends PulsarHandler {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
+        this.localAddress = ctx.channel().localAddress();
+        this.remoteAddress = ctx.channel().remoteAddress();
+
         this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> 
checkRequestTimeout(), operationTimeoutMs,
                 operationTimeoutMs, TimeUnit.MILLISECONDS);
 
@@ -419,7 +424,8 @@ public class ClientCnx extends PulsarHandler {
                 completableFuture.complete(null);
             } else {
                 completableFuture.completeExceptionally(
-                        getPulsarClientException(ackResponse.getError(), 
ackResponse.getMessage()));
+                        getPulsarClientException(ackResponse.getError(),
+                                                 
buildError(ackResponse.getRequestId(), ackResponse.getMessage())));
             }
         } else {
             log.warn("AckResponse has complete when receive response! 
requestId : {}, consumerId : {}",
@@ -550,7 +556,8 @@ public class ClientCnx extends PulsarHandler {
                 if (lookupResult.hasError()) {
                     checkServerError(lookupResult.getError(), 
lookupResult.getMessage());
                     requestFuture.completeExceptionally(
-                            getPulsarClientException(lookupResult.getError(), 
lookupResult.getMessage()));
+                            getPulsarClientException(lookupResult.getError(),
+                                    buildError(lookupResult.getRequestId(), 
lookupResult.getMessage())));
                 } else {
                     requestFuture
                             .completeExceptionally(new 
PulsarClientException.LookupException("Empty lookup response"));
@@ -583,7 +590,8 @@ public class ClientCnx extends PulsarHandler {
             if (!lookupResult.hasResponse()
                     || 
CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse()))
 {
                 if (lookupResult.hasError()) {
-                    String message = lookupResult.hasMessage() ? 
lookupResult.getMessage() : null;
+                    String message = buildError(lookupResult.getRequestId(),
+                                                lookupResult.hasMessage() ? 
lookupResult.getMessage() : null);
                     checkServerError(lookupResult.getError(), message);
                     requestFuture.completeExceptionally(
                             getPulsarClientException(lookupResult.getError(), 
message));
@@ -692,7 +700,9 @@ public class ClientCnx extends PulsarHandler {
         }
         CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
         if (requestFuture != null) {
-            
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), 
error.getMessage()));
+            requestFuture.completeExceptionally(
+                    getPulsarClientException(error.getError(),
+                                             buildError(error.getRequestId(), 
error.getMessage())));
         } else {
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), error.getRequestId());
         }
@@ -889,7 +899,8 @@ public class ClientCnx extends PulsarHandler {
                     return CompletableFuture.completedFuture(Optional.empty());
                 } else {
                     return FutureUtil.failedFuture(
-                        getPulsarClientException(rc, 
commandGetSchemaResponse.getErrorMessage()));
+                        getPulsarClientException(rc,
+                                buildError(requestId, 
commandGetSchemaResponse.getErrorMessage())));
                 }
             } else {
                 return CompletableFuture.completedFuture(
@@ -913,7 +924,7 @@ public class ClientCnx extends PulsarHandler {
                     return 
CompletableFuture.completedFuture(SchemaVersion.Empty.bytes());
                 } else {
                     return FutureUtil.failedFuture(getPulsarClientException(
-                            rc, response.getErrorMessage()));
+                                                           rc, 
buildError(requestId, response.getErrorMessage())));
                 }
             } else {
                 return 
CompletableFuture.completedFuture(response.getSchemaVersion());
@@ -1082,6 +1093,14 @@ public class ClientCnx extends PulsarHandler {
         this.remoteHostName = remoteHostName;
     }
 
+    private String buildError(long requestId, String errorMsg) {
+        return new StringBuilder().append("{\"errorMsg\":\"").append(errorMsg)
+            .append("\",\"reqId\":").append(requestId)
+            .append(", \"remote\":\"").append(remoteAddress)
+            .append("\", \"local\":\"").append(localAddress)
+            .append("\"}").toString();
+    }
+
     public static PulsarClientException getPulsarClientException(ServerError 
error, String errorMsg) {
         switch (error) {
         case AuthenticationError:
@@ -1144,8 +1163,10 @@ public class ClientCnx extends PulsarHandler {
                     && !requestFuture.hasGotResponse()) {
                 pendingRequests.remove(request.requestId, requestFuture);
                 if (!requestFuture.isDone()) {
-                    String timeoutMessage = String.format("%d %s timedout 
after ms %d", request.requestId,
-                            request.requestType.getDescription(), 
operationTimeoutMs);
+                    String timeoutMessage = String.format(
+                            "%s timeout {'durationMs': '%d', 'reqId':'%d', 
'remote':'%s', 'local':'%s'}",
+                            request.requestType.getDescription(), 
operationTimeoutMs,
+                            request.requestId, remoteAddress, localAddress);
                     if (requestFuture.completeExceptionally(new 
TimeoutException(timeoutMessage))) {
                         log.warn("{} {}", ctx.channel(), timeoutMessage);
                     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 17d8043..9a9f0ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,12 +48,14 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -127,7 +129,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
     private volatile MessageId lastMessageIdInBroker = MessageId.earliest;
 
-    private final long subscribeTimeout;
+    private final long lookupDeadline;
+
+    @SuppressWarnings("rawtypes")
+    private static final AtomicLongFieldUpdater<ConsumerImpl> 
SUBSCRIBE_DEADLINE_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ConsumerImpl.class, "subscribeDeadline");
+    @SuppressWarnings("unused")
+    private volatile long subscribeDeadline = 0; // gets set on first 
successful connection
+
     private final int partitionIndex;
     private final boolean hasParentConsumer;
 
@@ -191,6 +200,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final ExecutorService internalPinnedExecutor;
+    private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
@@ -243,7 +253,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = 
startMessageRollbackDurationInSec;
         AVAILABLE_PERMITS_UPDATER.set(this, 0);
-        this.subscribeTimeout = System.currentTimeMillis() + 
client.getConfiguration().getOperationTimeoutMs();
+        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
         this.partitionIndex = partitionIndex;
         this.hasParentConsumer = hasParentConsumer;
         this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2;
@@ -688,6 +698,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
+        previousExceptions.clear();
+
         if (getState() == State.Closing || getState() == State.Closed) {
             setState(State.Closed);
             closeConsumerTasks();
@@ -706,6 +718,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             acknowledgmentsGroupingTracker.flushAndClean();
         }
 
+        SUBSCRIBE_DEADLINE_UPDATER
+            .compareAndSet(this, 0L, System.currentTimeMillis() + 
client.getConfiguration().getOperationTimeoutMs());
+
         int currentSize;
         synchronized (this) {
             currentSize = incomingMessages.size();
@@ -778,7 +793,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
             if (e.getCause() instanceof PulsarClientException
                     && PulsarClientException.isRetriableError(e.getCause())
-                    && System.currentTimeMillis() < subscribeTimeout) {
+                    && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
                 reconnectLater(e.getCause());
             } else if (!subscribeFuture.isDone()) {
                 // unable to create new consumer, fail operation
@@ -882,17 +897,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     @Override
     public void connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
-        boolean timeout = System.currentTimeMillis() > subscribeTimeout;
-        if ((nonRetriableError || timeout) && 
subscribeFuture.completeExceptionally(exception)) {
-            setState(State.Failed);
-            if (nonRetriableError) {
-                log.info("[{}] Consumer creation failed for consumer {} with 
unretriableError {}", topic, consumerId, exception);
-            } else {
-                log.info("[{}] Consumer creation failed for consumer {} after 
timeout", topic, consumerId);
+        boolean timeout = System.currentTimeMillis() > lookupDeadline;
+        if (nonRetriableError || timeout) {
+            exception.setPreviousExceptions(previousExceptions);
+            if (subscribeFuture.completeExceptionally(exception)) {
+                setState(State.Failed);
+                if (nonRetriableError) {
+                    log.info("[{}] Consumer creation failed for consumer {} 
with unretriableError {}", topic, consumerId, exception);
+                } else {
+                    log.info("[{}] Consumer creation failed for consumer {} 
after timeout", topic, consumerId);
+                }
+                closeConsumerTasks();
+                deregisterFromClientCnx();
+                client.cleanupConsumer(this);
             }
-            closeConsumerTasks();
-            deregisterFromClientCnx();
-            client.cleanupConsumer(this);
+        } else {
+            previousExceptions.add(exception);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 9513caa..f4f6c3c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -52,6 +52,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -102,7 +103,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     private final Queue<OpSendMsg> pendingMessages;
     private final Optional<Semaphore> semaphore;
     private volatile Timeout sendTimeout = null;
-    private long createProducerTimeout;
+    private final long lookupDeadline;
+
+    @SuppressWarnings("rawtypes")
+    private static final AtomicLongFieldUpdater<ProducerImpl> 
PRODUCER_DEADLINE_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ProducerImpl.class, "producerDeadline");
+    @SuppressWarnings("unused")
+    private volatile long producerDeadline = 0; // gets set on first 
successful connection
+
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = 
CompletableFuture.completedFuture(null);
 
@@ -140,6 +148,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     private ScheduledFuture<?> batchTimerTask;
 
     private Optional<Long> topicEpoch = Optional.empty();
+    private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
 
     @SuppressWarnings("rawtypes")
     private static final AtomicLongFieldUpdater<ProducerImpl> 
msgIdGeneratorUpdater = AtomicLongFieldUpdater
@@ -217,7 +226,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             sendTimeout = client.timer().newTimeout(this, 
conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
         }
 
-        this.createProducerTimeout = System.currentTimeMillis() + 
client.getConfiguration().getOperationTimeoutMs();
+        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
         if (conf.isBatchingEnabled()) {
             BatcherBuilder containerBuilder = conf.getBatcherBuilder();
             if (containerBuilder == null) {
@@ -1282,6 +1291,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     @Override
     public void connectionOpened(final ClientCnx cnx) {
+        previousExceptions.clear();
+
         // we set the cnx reference before registering the producer on the 
cnx, so if the cnx breaks before creating the
         // producer, it will try to grab a new cnx
         connectionHandler.setClientCnx(cnx);
@@ -1291,6 +1302,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         long requestId = client.newRequestId();
 
+        PRODUCER_DEADLINE_UPDATER
+            .compareAndSet(this, 0, System.currentTimeMillis() + 
client.getConfiguration().getOperationTimeoutMs());
+
         SchemaInfo schemaInfo = null;
         if (schema != null) {
             if (schema.getSchemaInfo() != null) {
@@ -1437,8 +1451,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
-                    (cause instanceof PulsarClientException && 
PulsarClientException.isRetriableError(cause)
-                            && System.currentTimeMillis() < 
createProducerTimeout)) {
+                               (cause instanceof PulsarClientException && 
PulsarClientException.isRetriableError(cause)
+                                && System.currentTimeMillis() < 
PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
                         // Either we had already created the producer once 
(producerCreatedFuture.isDone()) or we are
                         // still within the initial timeout budget and we are 
dealing with a retriable error
                         reconnectLater(cause);
@@ -1460,15 +1474,20 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     @Override
     public void connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
-        boolean producerTimeout = System.currentTimeMillis() > 
createProducerTimeout;
-        if ((nonRetriableError || producerTimeout) && 
producerCreatedFuture.completeExceptionally(exception)) {
-            if (nonRetriableError) {
-                log.info("[{}] Producer creation failed for producer {} with 
unretriableError = {}", topic, producerId, exception);
-            } else {
-                log.info("[{}] Producer creation failed for producer {} after 
producerTimeout", topic, producerId);
+        boolean timeout = System.currentTimeMillis() > lookupDeadline;
+        if (nonRetriableError || timeout) {
+            exception.setPreviousExceptions(previousExceptions);
+            if (producerCreatedFuture.completeExceptionally(exception)) {
+                if (nonRetriableError) {
+                    log.info("[{}] Producer creation failed for producer {} 
with unretriableError = {}", topic, producerId, exception);
+                } else {
+                    log.info("[{}] Producer creation failed for producer {} 
after producerTimeout", topic, producerId);
+                }
+                setState(State.Failed);
+                client.cleanupProducer(this);
             }
-            setState(State.Failed);
-            client.cleanupProducer(this);
+        } else {
+            previousExceptions.add(exception);
         }
     }
 
@@ -1877,4 +1896,4 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 894f67d..5b999a7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -882,13 +882,14 @@ public class PulsarClientImpl implements PulsarClient {
 
         try {
             TopicName topicName = TopicName.get(topic);
-            AtomicLong opTimeoutMs = new 
AtomicLong(conf.getOperationTimeoutMs());
+            AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs());
             Backoff backoff = new BackoffBuilder()
                     .setInitialTime(100, TimeUnit.MILLISECONDS)
                     .setMandatoryStop(opTimeoutMs.get() * 2, 
TimeUnit.MILLISECONDS)
                     .setMax(1, TimeUnit.MINUTES)
                     .create();
-            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, 
metadataFuture);
+            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs,
+                                        metadataFuture, new ArrayList<>());
         } catch (IllegalArgumentException e) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(e.getMessage()));
         }
@@ -898,23 +899,27 @@ public class PulsarClientImpl implements PulsarClient {
     private void getPartitionedTopicMetadata(TopicName topicName,
                                              Backoff backoff,
                                              AtomicLong remainingTime,
-                                             
CompletableFuture<PartitionedTopicMetadata> future) {
+                                             
CompletableFuture<PartitionedTopicMetadata> future,
+                                             List<Throwable> 
previousExceptions) {
+        long startTime = System.nanoTime();
         
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e
 -> {
+            remainingTime.addAndGet(-1 * 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or 
server side which will lead to `TooManyRequestsException`
             boolean isLookupThrottling = 
!PulsarClientException.isRetriableError(e.getCause())
-                || e.getCause() instanceof 
PulsarClientException.TooManyRequestsException
                 || e.getCause() instanceof 
PulsarClientException.AuthenticationException;
             if (nextDelay <= 0 || isLookupThrottling) {
+                PulsarClientException.setPreviousExceptions(e, 
previousExceptions);
                 future.completeExceptionally(e);
                 return null;
             }
+            previousExceptions.add(e);
 
             ((ScheduledExecutorService) 
externalExecutorProvider.getExecutor()).schedule(() -> {
                 log.warn("[topic: {}] Could not get connection while 
getPartitionedTopicMetadata -- Will try again in {} ms",
                     topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
-                getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future);
+                getPartitionedTopicMetadata(topicName, backoff, remainingTime, 
future, previousExceptions);
             }, nextDelay, TimeUnit.MILLISECONDS);
             return null;
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 0723708..60b3370 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -96,6 +96,12 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     private long operationTimeoutMs = 30000;
 
     @ApiModelProperty(
+            name = "lookupTimeoutMs",
+            value = "Client lookup timeout (in millisecond)."
+    )
+    private long lookupTimeoutMs = -1;
+
+    @ApiModelProperty(
             name = "statsIntervalSeconds",
             value = " Interval to print client stats (in second)."
     )
@@ -327,6 +333,14 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
         return false;
     }
 
+    public long getLookupTimeoutMs() {
+        if (lookupTimeoutMs >= 0) {
+            return lookupTimeoutMs;
+        } else {
+            return operationTimeoutMs;
+        }
+    }
+
     public ClientConfigurationData clone() {
         try {
             return (ClientConfigurationData) super.clone();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4484ab9..ea651ac 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1044,7 +1044,7 @@ public class Commands {
         serializedCmdPong.release();
     }
 
-    static ByteBuf newPong() {
+    public static ByteBuf newPong() {
         return cmdPong.retainedDuplicate();
     }
 

Reply via email to