This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b557e24 PIP-91: Separate lookup timeout from operation timeout (#11627) b557e24 is described below commit b557e2479c70631fa0dc34606bc5492f73cfef96 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue Aug 17 11:11:23 2021 +0100 PIP-91: Separate lookup timeout from operation timeout (#11627) * PIP-91: Separate lookup timeout from operation timeout This patch contains a number of changes. TooManyRequests is retried for partition metadata and lookups Lookup timeout configuration has been added. By default it matches operation timeout. Partition metadata timeout calculation has been fixed to calculate the elapsed time correctly. Small refactor on broker construction to allow a mocked ServerCnx implementation for testing. Unfortunately, the test takes over 50 seconds, but this is unavoidable due to the fact that we're working with timeouts here. PulsarClientExceptions have been reworked to contain more context (remote/local/reqid) and any previous exceptions which may have occurred triggering retries. The previous exceptions must be manually recorded, so this only applies to lookups on the consumer side for now. * Fixup for test failures BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was depending on the fact that TooManyRequests was previously fatal for partition metadata request. Now that it retries, that test was failing. It's a bad test anyhow, depending on thread interactions and whatnot. I've rewritten it to use the ServerCnx mock. It now actually tests for the thing it should, that clients close the connection after the max rejects. The schema tests were failing because they expected a certain exception message which has been extended. I changes endsWith to contains. I also added Producer retries similiar to the Consumer ones. I was going to do as a followon PR, but decided to put in this one. Co-authored-by: Ivan Kelly <ike...@splunk.com> --- .../org/apache/pulsar/broker/PulsarService.java | 9 +- .../pulsar/broker/service/BrokerService.java | 13 +- .../broker/service/PulsarChannelInitializer.java | 15 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 7 +- .../loadbalance/LeaderElectionServiceTest.java | 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 140 +++++++--- .../service/BrokerServiceThrottlingTest.java | 108 ++++++-- .../pulsar/client/api/MockBrokerService.java | 12 + .../client/impl/BrokerClientIntegrationTest.java | 70 +---- .../apache/pulsar/client/impl/LookupRetryTest.java | 287 +++++++++++++++++++++ .../SchemaTypeCompatibilityCheckTest.java | 16 +- .../apache/pulsar/client/api/ClientBuilder.java | 18 ++ .../pulsar/client/api/PulsarClientException.java | 149 ++++++++--- .../pulsar/client/impl/ClientBuilderImpl.java | 6 + .../org/apache/pulsar/client/impl/ClientCnx.java | 37 ++- .../apache/pulsar/client/impl/ConsumerImpl.java | 46 +++- .../apache/pulsar/client/impl/ProducerImpl.java | 45 +++- .../pulsar/client/impl/PulsarClientImpl.java | 15 +- .../client/impl/conf/ClientConfigurationData.java | 14 + .../apache/pulsar/common/protocol/Commands.java | 2 +- 20 files changed, 801 insertions(+), 212 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 685cf05..0c0cec7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable { private ProtocolHandlers protocolHandlers = null; private final ShutdownService shutdownService; - private final EventLoopGroup ioEventLoopGroup; + protected final EventLoopGroup ioEventLoopGroup; private MetricsGenerator metricsGenerator; @@ -658,7 +659,7 @@ public class PulsarService implements AutoCloseable { config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup ); - this.brokerService = new BrokerService(this, ioEventLoopGroup); + this.brokerService = newBrokerService(this); // Start load management service (even if load balancing is disabled) this.loadManager.set(LoadManager.create(this)); @@ -1678,4 +1679,8 @@ public class PulsarService implements AutoCloseable { || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } + @VisibleForTesting + protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { + return new BrokerService(pulsar, ioEventLoopGroup); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4a2cad9..c0cd311 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -254,6 +255,7 @@ public class BrokerService implements Closeable { @Getter private final BundlesQuotas bundlesQuotas; + private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY; private Channel listenChannel; private Channel listenChannelTls; @@ -410,7 +412,8 @@ public class BrokerService implements Closeable { ServiceConfiguration serviceConfig = pulsar.getConfiguration(); - bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); + bootstrap.childHandler( + pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false)); Optional<Integer> port = serviceConfig.getBrokerServicePort(); if (port.isPresent()) { @@ -427,7 +430,8 @@ public class BrokerService implements Closeable { Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls(); if (tlsPort.isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); - tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true)); + tlsBootstrap.childHandler( + pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true)); try { listenChannelTls = tlsBootstrap.bind(new InetSocketAddress( pulsar.getBindAddress(), tlsPort.get())).sync() @@ -2647,4 +2651,9 @@ public class BrokerService implements Closeable { public long getPausedConnections() { return pausedConnections.longValue(); } + + @VisibleForTesting + public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { + this.pulsarChannelInitFactory = factory; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index f5f2be9..1ebd5b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -129,7 +130,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ServerCnx cnx = new ServerCnx(pulsar); + ServerCnx cnx = newServerCnx(pulsar); ch.pipeline().addLast("handler", cnx); connections.put(ch.remoteAddress(), cnx); @@ -144,4 +145,16 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> } }); } + + @VisibleForTesting + protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception { + return new ServerCnx(pulsar); + } + + public interface Factory { + PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception; + } + + public static final Factory DEFAULT_FACTORY = + (pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index f14c6e1..37a7f3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -280,13 +280,12 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { } protected PulsarService startBroker(ServiceConfiguration conf) throws Exception { - return startBrokerWithoutAuthorization(conf); } protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception { conf.setBrokerShutdownTimeoutMs(0L); - PulsarService pulsar = spy(new PulsarService(conf)); + PulsarService pulsar = spy(newPulsarService(conf)); setupBrokerMocks(pulsar); beforePulsarStartMocks(pulsar); pulsar.start(); @@ -295,6 +294,10 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { return pulsar; } + protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception { + return new PulsarService(conf); + } + protected void setupBrokerMocks(PulsarService pulsar) throws Exception { // Override default providers with mocked ones doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index faf5125..d252b1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -129,7 +129,9 @@ public class LeaderElectionServiceTest { .create(); } catch (PulsarClientException t) { Assert.assertTrue(t instanceof PulsarClientException.LookupException); - Assert.assertEquals(t.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!"); + Assert.assertTrue( + t.getMessage().contains( + "java.lang.IllegalStateException: The leader election has not yet been completed!")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index ea30d17..9fee5e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -33,6 +33,9 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -79,13 +82,18 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -879,42 +887,114 @@ public class BrokerServiceTest extends BrokerTestBase { */ @Test public void testLookupThrottlingForClientByClient() throws Exception { + // This test looks like it could be flakey, if the broker responds + // quickly enough, there may never be concurrency in requests final String topicName = "persistent://prop/ns-abc/newTopic"; - @Cleanup - PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsar.getBrokerServiceUrl()) - .statsInterval(0, TimeUnit.SECONDS) - .maxConcurrentLookupRequests(1) - .maxLookupRequests(2) - .build(); + PulsarServiceNameResolver resolver = new PulsarServiceNameResolver(); + resolver.updateServiceUrl(pulsar.getBrokerServiceUrl()); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setConcurrentLookupRequest(1); + conf.setMaxLookupRequest(2); + + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false, + new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon())); + long reqId = 0xdeadbeef; + try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) { + // for PMR + // 2 lookup will succeed + long reqId1 = reqId++; + ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1); + CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1)); + + long reqId2 = reqId++; + ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2); + CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2)); + + f1.get(); + f2.get(); + + // 3 lookup will fail + long reqId3 = reqId++; + ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3); + f1 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3)); + + long reqId4 = reqId++; + ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4); + f2 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4)); + + long reqId5 = reqId++; + ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5); + CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5)); - // 2 lookup will success. - try { - CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync(); - CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync(); + try { + f1.get(); + f2.get(); + f3.get(); + fail("At least one should fail"); + } catch (ExecutionException e) { + Throwable rootCause = e; + while (rootCause instanceof ExecutionException) { + rootCause = rootCause.getCause(); + } + if (!(rootCause instanceof + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { + throw e; + } + } - consumer1.get().close(); - consumer2.get().close(); - } catch (Exception e) { - fail("Subscribe should success with 2 requests"); - } + // for Lookup + // 2 lookup will succeed + long reqId6 = reqId++; + ByteBuf request6 = Commands.newLookup(topicName, true, reqId6); + f1 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6)); + + long reqId7 = reqId++; + ByteBuf request7 = Commands.newLookup(topicName, true, reqId7); + f2 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7)); + + f1.get(); + f2.get(); + + // 3 lookup will fail + long reqId8 = reqId++; + ByteBuf request8 = Commands.newLookup(topicName, true, reqId8); + f1 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8)); + + long reqId9 = reqId++; + ByteBuf request9 = Commands.newLookup(topicName, true, reqId9); + f2 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9)); + + long reqId10 = reqId++; + ByteBuf request10 = Commands.newLookup(topicName, true, reqId10); + f3 = pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10)); - // 3 lookup will fail - try { - CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync(); - CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync(); - CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync(); - - consumer1.get().close(); - consumer2.get().close(); - consumer3.get().close(); - fail("It should fail as throttling should only receive 2 requests"); - } catch (Exception e) { - if (!(e.getCause() instanceof - org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { - fail("Subscribe should fail with TooManyRequestsException"); + try { + f1.get(); + f2.get(); + f3.get(); + fail("At least one should fail"); + } catch (ExecutionException e) { + Throwable rootCause = e; + while (rootCause instanceof ExecutionException) { + rootCause = rootCause.getCause(); + } + if (!(rootCause instanceof + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) { + throw e; + } } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index 37436d7..16e3130 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -21,27 +21,37 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - +import io.netty.buffer.ByteBuf; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import lombok.Cleanup; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.testng.annotations.AfterMethod; @@ -148,32 +158,86 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase { } } - List<Consumer<byte[]>> successfulConsumers = Collections.synchronizedList(Lists.newArrayList()); - @Cleanup("shutdownNow") + PulsarServiceNameResolver resolver = new PulsarServiceNameResolver(); + resolver.updateServiceUrl(pulsar.getBrokerServiceUrl()); + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setConnectionsPerBroker(20); + + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false, + new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon())); ExecutorService executor = Executors.newFixedThreadPool(10); - final int totalConsumers = 20; - CountDownLatch latch = new CountDownLatch(totalConsumers); - for (int i = 0; i < totalConsumers; i++) { - executor.execute(() -> { + try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) { + final int totalConsumers = 20; + List<Future<?>> futures = new ArrayList<>(); + + // test for partitionMetadataRequest + for (int i = 0; i < totalConsumers; i++) { + long reqId = 0xdeadbeef + i; + Future<?> f = executor.submit(() -> { + ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId); + pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request, reqId)) + .get(); + return null; + }); + futures.add(f); + } + + int rejects = 0; + for (Future<?> f : futures) { try { - successfulConsumers.add(pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub") - .subscriptionType(SubscriptionType.Shared).subscribe()); - } catch (PulsarClientException.TooManyRequestsException e) { - // ok - } catch (Exception e) { - fail("it shouldn't failed"); + f.get(); + } catch (ExecutionException e) { + Throwable rootCause = e; + while (rootCause instanceof ExecutionException) { + rootCause = rootCause.getCause(); + } + if (rootCause instanceof + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) { + rejects++; + } else { + throw e; + } } - latch.countDown(); - }); - } - latch.await(); + } + assertTrue(rejects > 0); + futures.clear(); + + // test for lookup + for (int i = 0; i < totalConsumers; i++) { + long reqId = 0xdeadfeef + i; + Future<?> f = executor.submit(() -> { + ByteBuf request = Commands.newLookup(topicName, true, reqId); + pool.getConnection(resolver.resolveHost()) + .thenCompose(clientCnx -> clientCnx.newLookup(request, reqId)) + .get(); + return null; + }); + futures.add(f); + } - for (Consumer<?> c : successfulConsumers) { - if (c != null) { - c.close(); + rejects = 0; + for (Future<?> f : futures) { + try { + f.get(); + } catch (ExecutionException e) { + Throwable rootCause = e; + while (rootCause instanceof ExecutionException) { + rootCause = rootCause.getCause(); + } + if (rootCause instanceof + org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) { + rejects++; + } else { + throw e; + } + } } + assertTrue(rejects > 0); + } finally { + executor.shutdownNow(); + eventLoop.shutdownNow(); } - assertNotEquals(successfulConsumers.size(), totalConsumers); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index 33094ed..b16fac4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -58,6 +58,8 @@ import org.apache.pulsar.common.api.proto.CommandFlow; import org.apache.pulsar.common.api.proto.CommandLookupTopic; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.api.proto.CommandPing; +import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandProducer; import org.apache.pulsar.common.api.proto.CommandSend; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -247,6 +249,16 @@ public class MockBrokerService { log.warn("Got exception", cause); ctx.close(); } + + @Override + final protected void handlePing(CommandPing ping) { + // Immediately reply success to ping requests + ctx.writeAndFlush(Commands.newPong()); + } + + @Override + final protected void handlePong(CommandPong pong) { + } } private final Server server; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index bcfd514..fb3c30b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -528,74 +528,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { } /** - * <pre> - * Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame - * 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0 - * 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100 - * 3. create multiple producer and make lookup-requests simultaneously - * 4. Client1 receives TooManyLookupException and should close connection - * </pre> - * - * @throws Exception - */ - @Test - public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { - - final String topicName = "persistent://prop/usw/my-ns/newTopic"; - final int maxConccurentLookupRequest = pulsar.getConfiguration().getMaxConcurrentLookupRequest(); - final int concurrentLookupRequests = 20; - @Cleanup("shutdownNow") - ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests); - try { - stopBroker(); - conf.setMaxConcurrentLookupRequest(1); - startBroker(); - String lookupUrl = pulsar.getBrokerServiceUrl(); - - @Cleanup - PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS) - .maxNumberOfRejectedRequestPerConnection(0).build(); - - @Cleanup - PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl(lookupUrl).statsInterval(0, TimeUnit.SECONDS) - .ioThreads(concurrentLookupRequests).connectionsPerBroker(20).build(); - - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); - ClientCnx cnx = producer.cnx(); - assertTrue(cnx.channel().isActive()); - - final int totalProducer = 100; - CountDownLatch latch = new CountDownLatch(totalProducer * 2); - AtomicInteger failed = new AtomicInteger(0); - for (int i = 0; i < totalProducer; i++) { - executor.submit(() -> { - pulsarClient2.newProducer().topic(topicName).createAsync().handle((ok, e) -> { - if (e != null) { - failed.set(1); - } - latch.countDown(); - return null; - }); - pulsarClient.newProducer().topic(topicName).createAsync().handle((ok, e) -> { - if (e != null) { - failed.set(1); - } - latch.countDown(); - return null; - }); - }); - - } - - latch.await(10, TimeUnit.SECONDS); - // connection must be closed - assertEquals(failed.get(), 1); - } finally { - conf.setMaxConcurrentLookupRequest(maxConccurentLookupRequest); - } - } - - /** * It verifies that broker throttles down configured concurrent topic loading requests * * <pre> @@ -1021,4 +953,4 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { consumer.close(); producer.close(); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java new file mode 100644 index 0000000..ee44a39 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; +import static org.apache.pulsar.common.protocol.Commands.newPartitionMetadataResponse; + +import com.google.common.collect.Sets; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarChannelInitializer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.api.proto.CommandLookupTopic; +import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class LookupRetryTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(LookupRetryTest.class); + private static final String subscription = "reader-sub"; + private final AtomicInteger connectionsCreated = new AtomicInteger(0); + private final ConcurrentHashMap<String, Queue<LookupError>> failureMap = new ConcurrentHashMap<>(); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default", Sets.newHashSet("test")); + } + + @Override + protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception { + return new PulsarService(conf) { + @Override + protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { + BrokerService broker = new BrokerService(this, ioEventLoopGroup); + broker.setPulsarChannelInitializerFactory( + (_pulsar, tls) -> { + return new PulsarChannelInitializer(_pulsar, tls) { + @Override + protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception { + connectionsCreated.incrementAndGet(); + return new ErrorByTopicServerCnx(pulsar, failureMap); + } + }; + }); + return broker; + } + }; + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + PulsarClient newClient() throws Exception { + return PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .connectionTimeout(2, TimeUnit.SECONDS) + .operationTimeout(1, TimeUnit.SECONDS) + .lookupTimeout(10, TimeUnit.SECONDS) + .build(); + } + + @Test + public void testGetPartitionedMetadataRetries() throws Exception { + try (PulsarClient client = newClient()) { + client.getPartitionsForTopic("TIMEOUT:2,OK:10").get(); + } + try (PulsarClient client = newClient()) { + client.getPartitionsForTopic("TOO_MANY:2,OK:10").get(); + } + } + + @Test + public void testTimeoutRetriesOnPartitionMetadata() throws Exception { + try (PulsarClient client = newClient(); + Reader<byte[]> reader = client + .newReader().topic("TIMEOUT:2,OK:3").startMessageId(MessageId.latest) + .startMessageIdInclusive().readerName(subscription).create()) { + } + } + + @Test + public void testTooManyRetriesOnPartitionMetadata() throws Exception { + try (PulsarClient client = newClient(); + Reader<byte[]> reader = client + .newReader().topic("TOO_MANY:2,OK:3").startMessageId(MessageId.latest) + .startMessageIdInclusive().readerName(subscription).create()) { + } + } + + @Test + public void testTooManyOnLookup() throws Exception { + try (PulsarClient client = newClient(); + Reader<byte[]> reader = client + .newReader().topic("OK:1,TOO_MANY:2,OK:3").startMessageId(MessageId.latest) + .startMessageIdInclusive().readerName(subscription).create()) { + } + } + + @Test + public void testTimeoutOnLookup() throws Exception { + try (PulsarClient client = newClient(); + Reader<byte[]> reader = client + .newReader().topic("OK:1,TIMEOUT:2,OK:3").startMessageId(MessageId.latest) + .startMessageIdInclusive().readerName(subscription).create()) { + } + } + + @Test + public void testManyFailures() throws Exception { + try (PulsarClient client = newClient(); + Reader<byte[]> reader = client + .newReader().topic("TOO_MANY:1,TIMEOUT:1,OK:1,TIMEOUT:1,TOO_MANY:1,OK:3") + .startMessageId(MessageId.latest) + .startMessageIdInclusive().readerName(subscription).create()) { + } + } + + @Test + public void testProducerTimeoutOnPMR() throws Exception { + try (PulsarClient client = newClient(); + Producer<byte[]> producer = client.newProducer().topic("TIMEOUT:2,OK:3").create()) { + } + } + + @Test + public void testProducerTooManyOnPMR() throws Exception { + try (PulsarClient client = newClient(); + Producer<byte[]> producer = client.newProducer().topic("TOO_MANY:2,OK:3").create()) { + } + + } + + @Test + public void testProducerTimeoutOnLookup() throws Exception { + try (PulsarClient client = newClient(); + Producer<byte[]> producer = client.newProducer().topic("OK:1,TIMEOUT:2,OK:3").create()) { + } + } + + @Test + public void testProducerTooManyOnLookup() throws Exception { + try (PulsarClient client = newClient(); + Producer<byte[]> producer = client.newProducer().topic("OK:1,TOO_MANY:2,OK:3").create()) { + } + } + + /** + * <pre> + * Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame + * Client1: which has set MaxNumberOfRejectedRequestPerConnection=1, should fail on TooManyRequests + * Client2: which has set MaxNumberOfRejectedRequestPerConnection=100, should not fail + * on TooManyRequests, whether there is 1 or 4 (I don't do more because exponential + * backoff would make it take a long time. + * </pre> + * + * @throws Exception + */ + @Test + public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { + String lookupUrl = pulsar.getBrokerServiceUrl(); + try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl) + .maxNumberOfRejectedRequestPerConnection(1).build()) { + + // need 2 TooManyRequests because it takes the count before incrementing + pulsarClient.newProducer().topic("TOO_MANY:2").create().close(); + + Assert.assertEquals(connectionsCreated.get(), 2); + } + + try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl) + .maxNumberOfRejectedRequestPerConnection(100).build()) { + + pulsarClient.newProducer().topic("TOO_MANY:2").create().close(); + pulsarClient.newProducer().topic("TOO_MANY:4").create().close(); + Assert.assertEquals(connectionsCreated.get(), 3); + } + } + + enum LookupError { + UNKNOWN, + TOO_MANY, + TIMEOUT, + OK, + } + + private static class ErrorByTopicServerCnx extends ServerCnx { + private final ConcurrentHashMap<String, Queue<LookupError>> failureMap; + + ErrorByTopicServerCnx(PulsarService pulsar, ConcurrentHashMap<String, Queue<LookupError>> failureMap) { + super(pulsar); + this.failureMap = failureMap; + } + + private Queue<LookupError> errorList(String topicName) { + return failureMap.compute( + topicName, + (k, v) -> { + if (v == null) { + v = new ArrayBlockingQueue<LookupError>(100); + for (String e : k.split(",")) { + String[] parts = e.split(":"); + LookupError error = Enum.valueOf(LookupError.class, parts[0]); + for (int i = 0; i < Integer.parseInt(parts[1]); i++) { + v.add(error); + } + } + } + return v; + }); + } + + @Override + protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { + TopicName t = TopicName.get(partitionMetadata.getTopic()); + LookupError error = errorList(t.getLocalName()).poll(); + if (error == LookupError.TOO_MANY) { + final long requestId = partitionMetadata.getRequestId(); + ctx.writeAndFlush(newPartitionMetadataResponse(ServerError.TooManyRequests, "too many", requestId)); + } else if (error == LookupError.TIMEOUT) { + // do nothing + } else if (error == null || error == LookupError.OK) { + super.handlePartitionMetadataRequest(partitionMetadata); + } + } + + @Override + protected void handleLookup(CommandLookupTopic lookup) { + TopicName t = TopicName.get(lookup.getTopic()); + LookupError error = errorList(t.getLocalName()).poll(); + if (error == LookupError.TOO_MANY) { + final long requestId = lookup.getRequestId(); + ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, "too many", requestId)); + } else if (error == LookupError.TIMEOUT) { + // do nothing + } else if (error == null || error == LookupError.OK) { + super.handleLookup(lookup); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index 9b9c600..7fa08ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -97,7 +97,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .topic(topicName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type JSON, new schema type AVRO")); } @Test @@ -123,7 +123,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .subscriptionName(subName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, consumerBuilder::subscribe); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type JSON, new schema type AVRO")); } @Test @@ -149,7 +149,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .topic(topicName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type JSON, new schema type AVRO")); } @Test @@ -178,7 +178,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .subscriptionName(subName + "2"); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, consumerBuilder::subscribe); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type JSON, new schema type AVRO")); } @Test @@ -200,7 +200,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .topic(topicName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type INT32, new schema type STRING")); } @Test @@ -226,7 +226,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .subscriptionName(subName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, consumerBuilder::subscribe); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type INT32, new schema type STRING")); } @Test @@ -252,7 +252,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .topic(topicName); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, producerBuilder::create); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type INT32, new schema type STRING")); } @Test @@ -281,7 +281,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes .subscriptionName(subName + "2"); Throwable t = expectThrows(PulsarClientException.IncompatibleSchemaException.class, consumerBuilder::subscribe); - assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type INT32, new schema type STRING")); + assertTrue(t.getMessage().contains("Incompatible schema: exists schema type INT32, new schema type STRING")); } @Test diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 2605afd..5840340 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -218,6 +218,24 @@ public interface ClientBuilder extends Cloneable { ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit); /** + * Set lookup timeout <i>(default: matches operation timeout)</i> + * + * Lookup operations have a different load pattern to other operations. They can be handled by any broker, are not + * proportional to throughput, and are harmless to retry. Given this, it makes sense to allow them to retry longer + * than normal operation, especially if they experience a timeout. + * + * By default this is set to match operation timeout. This is to maintain legacy behaviour. However, in practice + * it should be set to 5-10x the operation timeout. + * + * @param lookupTimeout + * lookup timeout + * @param unit + * time unit for {@code lookupTimeout} + * @return the client builder instance + */ + ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit); + + /** * Set the number of threads to be used for handling connections to brokers <i>(default: 1 thread)</i>. * * @param numIoThreads the number of IO threads diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 8654203..5c0211f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import org.apache.pulsar.common.classification.InterfaceAudience; @@ -32,6 +33,8 @@ import org.apache.pulsar.common.classification.InterfaceStability; @SuppressWarnings("serial") public class PulsarClientException extends IOException { private long sequenceId = -1; + private Collection<Throwable> previous; + /** * Constructs an {@code PulsarClientException} with the specified detail message. * @@ -84,6 +87,49 @@ public class PulsarClientException extends IOException { } /** + * Add a list of previous exception which occurred for the same operation + * and have been retried. + * + * @param previous A collection of throwables that triggered retries + */ + public void setPreviousExceptions(Collection<Throwable> previous) { + this.previous = previous; + } + + /** + * Get the collection of previous exceptions which have caused retries + * for this operation. + * + * @return a collection of exception, ordered as they occurred + */ + public Collection<Throwable> getPreviousExceptions() { + return this.previous; + } + + @Override + public String toString() { + if (previous == null || previous.isEmpty()) { + return super.toString(); + } else { + StringBuilder sb = new StringBuilder(super.toString()); + int i = 0; + boolean first = true; + sb.append("{\"previous\":["); + for (Throwable t : previous) { + if (first) { + first = false; + } else { + sb.append(','); + } + sb.append("{\"attempt\":").append(i++) + .append(",\"error\":\"").append(t.toString().replace("\"", "\\\"")) + .append("\"}"); + } + sb.append("]}"); + return sb.toString(); + } + } + /** * Constructs an {@code PulsarClientException} with the specified cause. * * @param t @@ -957,77 +1003,110 @@ public class PulsarClientException extends IOException { // site Throwable cause = t.getCause(); String msg = cause.getMessage(); + PulsarClientException newException = null; if (cause instanceof TimeoutException) { - return new TimeoutException(msg); + newException = new TimeoutException(msg); } else if (cause instanceof InvalidConfigurationException) { - return new InvalidConfigurationException(msg); + newException = new InvalidConfigurationException(msg); } else if (cause instanceof AuthenticationException) { - return new AuthenticationException(msg); + newException = new AuthenticationException(msg); } else if (cause instanceof IncompatibleSchemaException) { - return new IncompatibleSchemaException(msg); + newException = new IncompatibleSchemaException(msg); } else if (cause instanceof TooManyRequestsException) { - return new TooManyRequestsException(msg); + newException = new TooManyRequestsException(msg); } else if (cause instanceof LookupException) { - return new LookupException(msg); + newException = new LookupException(msg); } else if (cause instanceof ConnectException) { - return new ConnectException(msg); + newException = new ConnectException(msg); } else if (cause instanceof AlreadyClosedException) { - return new AlreadyClosedException(msg); + newException = new AlreadyClosedException(msg); } else if (cause instanceof TopicTerminatedException) { - return new TopicTerminatedException(msg); + newException = new TopicTerminatedException(msg); } else if (cause instanceof AuthorizationException) { - return new AuthorizationException(msg); + newException = new AuthorizationException(msg); } else if (cause instanceof GettingAuthenticationDataException) { - return new GettingAuthenticationDataException(msg); + newException = new GettingAuthenticationDataException(msg); } else if (cause instanceof UnsupportedAuthenticationException) { - return new UnsupportedAuthenticationException(msg); + newException = new UnsupportedAuthenticationException(msg); } else if (cause instanceof BrokerPersistenceException) { - return new BrokerPersistenceException(msg); + newException = new BrokerPersistenceException(msg); } else if (cause instanceof BrokerMetadataException) { - return new BrokerMetadataException(msg); + newException = new BrokerMetadataException(msg); } else if (cause instanceof ProducerBusyException) { - return new ProducerBusyException(msg); + newException = new ProducerBusyException(msg); } else if (cause instanceof ConsumerBusyException) { - return new ConsumerBusyException(msg); + newException = new ConsumerBusyException(msg); } else if (cause instanceof NotConnectedException) { - return new NotConnectedException(); + newException = new NotConnectedException(); } else if (cause instanceof InvalidMessageException) { - return new InvalidMessageException(msg); + newException = new InvalidMessageException(msg); } else if (cause instanceof InvalidTopicNameException) { - return new InvalidTopicNameException(msg); + newException = new InvalidTopicNameException(msg); } else if (cause instanceof NotSupportedException) { - return new NotSupportedException(msg); + newException = new NotSupportedException(msg); } else if (cause instanceof NotAllowedException) { - return new NotAllowedException(msg); + newException = new NotAllowedException(msg); } else if (cause instanceof ProducerQueueIsFullError) { - return new ProducerQueueIsFullError(msg); + newException = new ProducerQueueIsFullError(msg); } else if (cause instanceof ProducerBlockedQuotaExceededError) { - return new ProducerBlockedQuotaExceededError(msg); + newException = new ProducerBlockedQuotaExceededError(msg); } else if (cause instanceof ProducerBlockedQuotaExceededException) { - return new ProducerBlockedQuotaExceededException(msg); + newException = new ProducerBlockedQuotaExceededException(msg); } else if (cause instanceof ChecksumException) { - return new ChecksumException(msg); + newException = new ChecksumException(msg); } else if (cause instanceof CryptoException) { - return new CryptoException(msg); + newException = new CryptoException(msg); } else if (cause instanceof ConsumerAssignException) { - return new ConsumerAssignException(msg); + newException = new ConsumerAssignException(msg); } else if (cause instanceof MessageAcknowledgeException) { - return new MessageAcknowledgeException(msg); + newException = new MessageAcknowledgeException(msg); } else if (cause instanceof TransactionConflictException) { - return new TransactionConflictException(msg); + newException = new TransactionConflictException(msg); } else if (cause instanceof TopicDoesNotExistException) { - return new TopicDoesNotExistException(msg); + newException = new TopicDoesNotExistException(msg); } else if (cause instanceof ProducerFencedException) { - return new ProducerFencedException(msg); + newException = new ProducerFencedException(msg); } else if (cause instanceof MemoryBufferIsFullError) { - return new MemoryBufferIsFullError(msg); + newException = new MemoryBufferIsFullError(msg); } else if (cause instanceof NotFoundException) { - return new NotFoundException(msg); + newException = new NotFoundException(msg); } else { - return new PulsarClientException(t); + newException = new PulsarClientException(t); } + + Collection<Throwable> previousExceptions = getPreviousExceptions(t); + if (previousExceptions != null) { + newException.setPreviousExceptions(previousExceptions); + } + return newException; } + public static Collection<Throwable> getPreviousExceptions(Throwable t) { + Throwable e = t; + for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) { + if (e instanceof PulsarClientException) { + Collection<Throwable> previous = ((PulsarClientException)e).getPreviousExceptions(); + if (previous != null) { + return previous; + } + } + e = t.getCause(); + } + return null; + } + + public static void setPreviousExceptions(Throwable t, Collection<Throwable> previous) { + Throwable e = t; + for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) { + if (e instanceof PulsarClientException) { + ((PulsarClientException)e).setPreviousExceptions(previous); + return; + } + e = t.getCause(); + } + } + + public long getSequenceId() { return sequenceId; } @@ -1059,4 +1138,4 @@ public class PulsarClientException extends IOException { } return true; } -} \ No newline at end of file +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index d7fa818..c8a4376 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -145,6 +145,12 @@ public class ClientBuilderImpl implements ClientBuilder { } @Override + public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) { + conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout)); + return this; + } + + @Override public ClientBuilder ioThreads(int numIoThreads) { conf.setNumIoThreads(numIoThreads); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index c36a7a6..23f06cb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -140,6 +140,8 @@ public class ClientCnx extends PulsarHandler { private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier(); private ScheduledFuture<?> timeoutTask; + private SocketAddress localAddress; + private SocketAddress remoteAddress; // Added for mutual authentication. @Getter @@ -208,6 +210,9 @@ public class ClientCnx extends PulsarHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); + this.localAddress = ctx.channel().localAddress(); + this.remoteAddress = ctx.channel().remoteAddress(); + this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> checkRequestTimeout(), operationTimeoutMs, operationTimeoutMs, TimeUnit.MILLISECONDS); @@ -419,7 +424,8 @@ public class ClientCnx extends PulsarHandler { completableFuture.complete(null); } else { completableFuture.completeExceptionally( - getPulsarClientException(ackResponse.getError(), ackResponse.getMessage())); + getPulsarClientException(ackResponse.getError(), + buildError(ackResponse.getRequestId(), ackResponse.getMessage()))); } } else { log.warn("AckResponse has complete when receive response! requestId : {}, consumerId : {}", @@ -550,7 +556,8 @@ public class ClientCnx extends PulsarHandler { if (lookupResult.hasError()) { checkServerError(lookupResult.getError(), lookupResult.getMessage()); requestFuture.completeExceptionally( - getPulsarClientException(lookupResult.getError(), lookupResult.getMessage())); + getPulsarClientException(lookupResult.getError(), + buildError(lookupResult.getRequestId(), lookupResult.getMessage()))); } else { requestFuture .completeExceptionally(new PulsarClientException.LookupException("Empty lookup response")); @@ -583,7 +590,8 @@ public class ClientCnx extends PulsarHandler { if (!lookupResult.hasResponse() || CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse())) { if (lookupResult.hasError()) { - String message = lookupResult.hasMessage() ? lookupResult.getMessage() : null; + String message = buildError(lookupResult.getRequestId(), + lookupResult.hasMessage() ? lookupResult.getMessage() : null); checkServerError(lookupResult.getError(), message); requestFuture.completeExceptionally( getPulsarClientException(lookupResult.getError(), message)); @@ -692,7 +700,9 @@ public class ClientCnx extends PulsarHandler { } CompletableFuture<?> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage())); + requestFuture.completeExceptionally( + getPulsarClientException(error.getError(), + buildError(error.getRequestId(), error.getMessage()))); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), error.getRequestId()); } @@ -889,7 +899,8 @@ public class ClientCnx extends PulsarHandler { return CompletableFuture.completedFuture(Optional.empty()); } else { return FutureUtil.failedFuture( - getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage())); + getPulsarClientException(rc, + buildError(requestId, commandGetSchemaResponse.getErrorMessage()))); } } else { return CompletableFuture.completedFuture( @@ -913,7 +924,7 @@ public class ClientCnx extends PulsarHandler { return CompletableFuture.completedFuture(SchemaVersion.Empty.bytes()); } else { return FutureUtil.failedFuture(getPulsarClientException( - rc, response.getErrorMessage())); + rc, buildError(requestId, response.getErrorMessage()))); } } else { return CompletableFuture.completedFuture(response.getSchemaVersion()); @@ -1082,6 +1093,14 @@ public class ClientCnx extends PulsarHandler { this.remoteHostName = remoteHostName; } + private String buildError(long requestId, String errorMsg) { + return new StringBuilder().append("{\"errorMsg\":\"").append(errorMsg) + .append("\",\"reqId\":").append(requestId) + .append(", \"remote\":\"").append(remoteAddress) + .append("\", \"local\":\"").append(localAddress) + .append("\"}").toString(); + } + public static PulsarClientException getPulsarClientException(ServerError error, String errorMsg) { switch (error) { case AuthenticationError: @@ -1144,8 +1163,10 @@ public class ClientCnx extends PulsarHandler { && !requestFuture.hasGotResponse()) { pendingRequests.remove(request.requestId, requestFuture); if (!requestFuture.isDone()) { - String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, - request.requestType.getDescription(), operationTimeoutMs); + String timeoutMessage = String.format( + "%s timeout {'durationMs': '%d', 'reqId':'%d', 'remote':'%s', 'local':'%s'}", + request.requestType.getDescription(), operationTimeoutMs, + request.requestId, remoteAddress, localAddress); if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) { log.warn("{} {}", ctx.channel(), timeoutMessage); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 17d8043..9a9f0ec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,12 +48,14 @@ import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -127,7 +129,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle protected volatile MessageId lastDequeuedMessageId = MessageId.earliest; private volatile MessageId lastMessageIdInBroker = MessageId.earliest; - private final long subscribeTimeout; + private final long lookupDeadline; + + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater<ConsumerImpl> SUBSCRIBE_DEADLINE_UPDATER = AtomicLongFieldUpdater + .newUpdater(ConsumerImpl.class, "subscribeDeadline"); + @SuppressWarnings("unused") + private volatile long subscribeDeadline = 0; // gets set on first successful connection + private final int partitionIndex; private final boolean hasParentConsumer; @@ -191,6 +200,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final ExecutorService internalPinnedExecutor; + private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>(); static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, @@ -243,7 +253,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0); - this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); + this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); this.partitionIndex = partitionIndex; this.hasParentConsumer = hasParentConsumer; this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2; @@ -688,6 +698,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override public void connectionOpened(final ClientCnx cnx) { + previousExceptions.clear(); + if (getState() == State.Closing || getState() == State.Closed) { setState(State.Closed); closeConsumerTasks(); @@ -706,6 +718,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle acknowledgmentsGroupingTracker.flushAndClean(); } + SUBSCRIBE_DEADLINE_UPDATER + .compareAndSet(this, 0L, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs()); + int currentSize; synchronized (this) { currentSize = incomingMessages.size(); @@ -778,7 +793,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (e.getCause() instanceof PulsarClientException && PulsarClientException.isRetriableError(e.getCause()) - && System.currentTimeMillis() < subscribeTimeout) { + && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { reconnectLater(e.getCause()); } else if (!subscribeFuture.isDone()) { // unable to create new consumer, fail operation @@ -882,17 +897,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @Override public void connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); - boolean timeout = System.currentTimeMillis() > subscribeTimeout; - if ((nonRetriableError || timeout) && subscribeFuture.completeExceptionally(exception)) { - setState(State.Failed); - if (nonRetriableError) { - log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", topic, consumerId, exception); - } else { - log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId); + boolean timeout = System.currentTimeMillis() > lookupDeadline; + if (nonRetriableError || timeout) { + exception.setPreviousExceptions(previousExceptions); + if (subscribeFuture.completeExceptionally(exception)) { + setState(State.Failed); + if (nonRetriableError) { + log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", topic, consumerId, exception); + } else { + log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId); + } + closeConsumerTasks(); + deregisterFromClientCnx(); + client.cleanupConsumer(this); } - closeConsumerTasks(); - deregisterFromClientCnx(); - client.cleanupConsumer(this); + } else { + previousExceptions.add(exception); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9513caa..f4f6c3c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -52,6 +52,7 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -102,7 +103,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private final Queue<OpSendMsg> pendingMessages; private final Optional<Semaphore> semaphore; private volatile Timeout sendTimeout = null; - private long createProducerTimeout; + private final long lookupDeadline; + + @SuppressWarnings("rawtypes") + private static final AtomicLongFieldUpdater<ProducerImpl> PRODUCER_DEADLINE_UPDATER = AtomicLongFieldUpdater + .newUpdater(ProducerImpl.class, "producerDeadline"); + @SuppressWarnings("unused") + private volatile long producerDeadline = 0; // gets set on first successful connection + private final BatchMessageContainerBase batchMessageContainer; private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null); @@ -140,6 +148,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private ScheduledFuture<?> batchTimerTask; private Optional<Long> topicEpoch = Optional.empty(); + private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>(); @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater @@ -217,7 +226,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS); } - this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); + this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); if (conf.isBatchingEnabled()) { BatcherBuilder containerBuilder = conf.getBatcherBuilder(); if (containerBuilder == null) { @@ -1282,6 +1291,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public void connectionOpened(final ClientCnx cnx) { + previousExceptions.clear(); + // we set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating the // producer, it will try to grab a new cnx connectionHandler.setClientCnx(cnx); @@ -1291,6 +1302,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne long requestId = client.newRequestId(); + PRODUCER_DEADLINE_UPDATER + .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs()); + SchemaInfo schemaInfo = null; if (schema != null) { if (schema.getSchemaInfo() != null) { @@ -1437,8 +1451,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne producerCreatedFuture.completeExceptionally(cause); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || // - (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) - && System.currentTimeMillis() < createProducerTimeout)) { + (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) + && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) { // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are // still within the initial timeout budget and we are dealing with a retriable error reconnectLater(cause); @@ -1460,15 +1474,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public void connectionFailed(PulsarClientException exception) { boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); - boolean producerTimeout = System.currentTimeMillis() > createProducerTimeout; - if ((nonRetriableError || producerTimeout) && producerCreatedFuture.completeExceptionally(exception)) { - if (nonRetriableError) { - log.info("[{}] Producer creation failed for producer {} with unretriableError = {}", topic, producerId, exception); - } else { - log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId); + boolean timeout = System.currentTimeMillis() > lookupDeadline; + if (nonRetriableError || timeout) { + exception.setPreviousExceptions(previousExceptions); + if (producerCreatedFuture.completeExceptionally(exception)) { + if (nonRetriableError) { + log.info("[{}] Producer creation failed for producer {} with unretriableError = {}", topic, producerId, exception); + } else { + log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId); + } + setState(State.Failed); + client.cleanupProducer(this); } - setState(State.Failed); - client.cleanupProducer(this); + } else { + previousExceptions.add(exception); } } @@ -1877,4 +1896,4 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); -} \ No newline at end of file +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 894f67d..5b999a7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -882,13 +882,14 @@ public class PulsarClientImpl implements PulsarClient { try { TopicName topicName = TopicName.get(topic); - AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs()); + AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs()); Backoff backoff = new BackoffBuilder() .setInitialTime(100, TimeUnit.MILLISECONDS) .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) .setMax(1, TimeUnit.MINUTES) .create(); - getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture); + getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, + metadataFuture, new ArrayList<>()); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -898,23 +899,27 @@ public class PulsarClientImpl implements PulsarClient { private void getPartitionedTopicMetadata(TopicName topicName, Backoff backoff, AtomicLong remainingTime, - CompletableFuture<PartitionedTopicMetadata> future) { + CompletableFuture<PartitionedTopicMetadata> future, + List<Throwable> previousExceptions) { + long startTime = System.nanoTime(); lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { + remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException` boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause()) - || e.getCause() instanceof PulsarClientException.TooManyRequestsException || e.getCause() instanceof PulsarClientException.AuthenticationException; if (nextDelay <= 0 || isLookupThrottling) { + PulsarClientException.setPreviousExceptions(e, previousExceptions); future.completeExceptionally(e); return null; } + previousExceptions.add(e); ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> { log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); - getPartitionedTopicMetadata(topicName, backoff, remainingTime, future); + getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 0723708..60b3370 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -96,6 +96,12 @@ public class ClientConfigurationData implements Serializable, Cloneable { private long operationTimeoutMs = 30000; @ApiModelProperty( + name = "lookupTimeoutMs", + value = "Client lookup timeout (in millisecond)." + ) + private long lookupTimeoutMs = -1; + + @ApiModelProperty( name = "statsIntervalSeconds", value = " Interval to print client stats (in second)." ) @@ -327,6 +333,14 @@ public class ClientConfigurationData implements Serializable, Cloneable { return false; } + public long getLookupTimeoutMs() { + if (lookupTimeoutMs >= 0) { + return lookupTimeoutMs; + } else { + return operationTimeoutMs; + } + } + public ClientConfigurationData clone() { try { return (ClientConfigurationData) super.clone(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 4484ab9..ea651ac 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1044,7 +1044,7 @@ public class Commands { serializedCmdPong.release(); } - static ByteBuf newPong() { + public static ByteBuf newPong() { return cmdPong.retainedDuplicate(); }