This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c684a8d3424e2db7d29ae22376f7d2e865eac721 Author: Lari Hotari <[email protected]> AuthorDate: Wed Aug 7 08:22:25 2024 +0300 [fix][client] Fix timeout handling in Pulsar Admin client (#23128) (cherry picked from commit 3b01c96594ae1af215018b1e1df29e5416f240d9) (cherry picked from commit 0326a3729428fd88fb92dcbae0c0d9eca9262de9) --- pulsar-client-admin/pom.xml | 7 ++ .../pulsar/client/admin/internal/BaseResource.java | 8 +- .../pulsar/client/admin/internal/BookiesImpl.java | 4 +- .../client/admin/internal/BrokerStatsImpl.java | 4 +- .../pulsar/client/admin/internal/BrokersImpl.java | 4 +- .../pulsar/client/admin/internal/ClustersImpl.java | 4 +- .../client/admin/internal/ComponentResource.java | 4 +- .../client/admin/internal/FunctionsImpl.java | 4 +- .../client/admin/internal/NamespacesImpl.java | 4 +- .../admin/internal/NonPersistentTopicsImpl.java | 4 +- .../pulsar/client/admin/internal/PackagesImpl.java | 4 +- .../client/admin/internal/ProxyStatsImpl.java | 4 +- .../client/admin/internal/PulsarAdminImpl.java | 46 +++---- .../client/admin/internal/ResourceGroupsImpl.java | 4 +- .../client/admin/internal/ResourceQuotasImpl.java | 4 +- .../pulsar/client/admin/internal/SchemasImpl.java | 4 +- .../pulsar/client/admin/internal/SinksImpl.java | 4 +- .../pulsar/client/admin/internal/SourcesImpl.java | 4 +- .../pulsar/client/admin/internal/TenantsImpl.java | 4 +- .../pulsar/client/admin/internal/TopicsImpl.java | 4 +- .../client/admin/internal/TransactionsImpl.java | 4 +- .../pulsar/client/admin/internal/WorkerImpl.java | 4 +- .../admin/internal/http/AsyncHttpConnector.java | 37 ++++-- .../internal/http/AsyncHttpConnectorTest.java | 140 +++++++++++++++++++++ pulsar-client-admin/src/test/resources/log4j2.xml | 41 ++++++ .../java/org/apache/pulsar/admin/cli/CmdBase.java | 14 +-- 26 files changed, 287 insertions(+), 82 deletions(-) diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index c7a390b469b..9fd7a54f5b3 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -107,6 +107,13 @@ <artifactId>pulsar-package-core</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock-jre8</artifactId> + <version>${wiremock.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index 8316aa8b4e5..6cc025aaf16 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -60,11 +60,11 @@ public abstract class BaseResource { private static final Logger log = LoggerFactory.getLogger(BaseResource.class); protected final Authentication auth; - protected final long readTimeoutMs; + protected final long requestTimeoutMs; - protected BaseResource(Authentication auth, long readTimeoutMs) { + protected BaseResource(Authentication auth, long requestTimeoutMs) { this.auth = auth; - this.readTimeoutMs = readTimeoutMs; + this.requestTimeoutMs = requestTimeoutMs; } public Builder request(final WebTarget target) throws PulsarAdminException { @@ -303,7 +303,7 @@ public abstract class BaseResource { protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException { try { - return executor.get().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + return executor.get().get(this.requestTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarAdminException(e); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java index f5f9a248b12..a749bb6ce9b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java @@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; public class BookiesImpl extends BaseResource implements Bookies { private final WebTarget adminBookies; - public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public BookiesImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminBookies = web.path("/admin/v2/bookies"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java index 86d9af6a8fb..89d3f1cc16b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokerStatsImpl.java @@ -38,8 +38,8 @@ public class BrokerStatsImpl extends BaseResource implements BrokerStats { private final WebTarget adminBrokerStats; private final WebTarget adminV2BrokerStats; - public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public BrokerStatsImpl(WebTarget target, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminBrokerStats = target.path("/admin/broker-stats"); adminV2BrokerStats = target.path("/admin/v2/broker-stats"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index 81de4ed6e05..a67608f55e3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -37,8 +37,8 @@ import org.apache.pulsar.common.util.Codec; public class BrokersImpl extends BaseResource implements Brokers { private final WebTarget adminBrokers; - public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public BrokersImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminBrokers = web.path("admin/v2/brokers"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 96744ff490a..6c59a35aea4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -45,8 +45,8 @@ public class ClustersImpl extends BaseResource implements Clusters { private final WebTarget adminClusters; - public ClustersImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public ClustersImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminClusters = web.path("/admin/v2/clusters"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java index 2aa8a884b4b..eeb7dd1058f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java @@ -34,8 +34,8 @@ import org.asynchttpclient.RequestBuilder; */ public class ComponentResource extends BaseResource { - protected ComponentResource(Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + protected ComponentResource(Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); } public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 0686917a381..750f642f365 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -72,8 +72,8 @@ public class FunctionsImpl extends ComponentResource implements Functions { private final WebTarget functions; private final AsyncHttpClient asyncHttpClient; - public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) { - super(auth, readTimeoutMs); + public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.functions = web.path("/admin/v3/functions"); this.asyncHttpClient = asyncHttpClient; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index bfb060596b1..ebaa9dac4b6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -64,8 +64,8 @@ public class NamespacesImpl extends BaseResource implements Namespaces { private final WebTarget adminNamespaces; private final WebTarget adminV2Namespaces; - public NamespacesImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public NamespacesImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminNamespaces = web.path("/admin/namespaces"); adminV2Namespaces = web.path("/admin/v2/namespaces"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java index 635df525619..4de084554b7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NonPersistentTopicsImpl.java @@ -38,8 +38,8 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste private final WebTarget adminNonPersistentTopics; private final WebTarget adminV2NonPersistentTopics; - public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public NonPersistentTopicsImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminNonPersistentTopics = web.path("/admin"); adminV2NonPersistentTopics = web.path("/admin/v2"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java index 885e39c1ce6..c2b0f6b7be9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java @@ -57,8 +57,8 @@ public class PackagesImpl extends ComponentResource implements Packages { private final WebTarget packages; private final AsyncHttpClient httpClient; - public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpClient client, long readTimeoutMs) { - super(auth, readTimeoutMs); + public PackagesImpl(WebTarget webTarget, Authentication auth, AsyncHttpClient client, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.httpClient = client; this.packages = webTarget.path("/admin/v3/packages"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java index 084e3b65dbc..f4bacc346b6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java @@ -32,8 +32,8 @@ public class ProxyStatsImpl extends BaseResource implements ProxyStats { private final WebTarget adminProxyStats; - public ProxyStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public ProxyStatsImpl(WebTarget target, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminProxyStats = target.path("/proxy-stats"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index 50b0d3e2b31..d78394ef936 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -153,29 +153,29 @@ public class PulsarAdminImpl implements PulsarAdmin { Math.toIntExact(clientConfigData.getRequestTimeoutMs()), clientConfigData.getAutoCertRefreshSeconds()); - long readTimeoutMs = clientConfigData.getReadTimeoutMs(); - this.clusters = new ClustersImpl(root, auth, readTimeoutMs); - this.brokers = new BrokersImpl(root, auth, readTimeoutMs); - this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs); - this.proxyStats = new ProxyStatsImpl(root, auth, readTimeoutMs); - this.tenants = new TenantsImpl(root, auth, readTimeoutMs); - this.resourcegroups = new ResourceGroupsImpl(root, auth, readTimeoutMs); - this.properties = new TenantsImpl(root, auth, readTimeoutMs); - this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs); - this.topics = new TopicsImpl(root, auth, readTimeoutMs); - this.localTopicPolicies = new TopicPoliciesImpl(root, auth, readTimeoutMs, false); - this.globalTopicPolicies = new TopicPoliciesImpl(root, auth, readTimeoutMs, true); - this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, readTimeoutMs); - this.resourceQuotas = new ResourceQuotasImpl(root, auth, readTimeoutMs); - this.lookups = new LookupImpl(root, auth, useTls, readTimeoutMs, topics); - this.functions = new FunctionsImpl(root, auth, asyncHttpConnector.getHttpClient(), readTimeoutMs); - this.sources = new SourcesImpl(root, auth, asyncHttpConnector.getHttpClient(), readTimeoutMs); - this.sinks = new SinksImpl(root, auth, asyncHttpConnector.getHttpClient(), readTimeoutMs); - this.worker = new WorkerImpl(root, auth, readTimeoutMs); - this.schemas = new SchemasImpl(root, auth, readTimeoutMs); - this.bookies = new BookiesImpl(root, auth, readTimeoutMs); - this.packages = new PackagesImpl(root, auth, asyncHttpConnector.getHttpClient(), readTimeoutMs); - this.transactions = new TransactionsImpl(root, auth, readTimeoutMs); + long requestTimeoutMs = clientConfigData.getRequestTimeoutMs(); + this.clusters = new ClustersImpl(root, auth, requestTimeoutMs); + this.brokers = new BrokersImpl(root, auth, requestTimeoutMs); + this.brokerStats = new BrokerStatsImpl(root, auth, requestTimeoutMs); + this.proxyStats = new ProxyStatsImpl(root, auth, requestTimeoutMs); + this.tenants = new TenantsImpl(root, auth, requestTimeoutMs); + this.resourcegroups = new ResourceGroupsImpl(root, auth, requestTimeoutMs); + this.properties = new TenantsImpl(root, auth, requestTimeoutMs); + this.namespaces = new NamespacesImpl(root, auth, requestTimeoutMs); + this.topics = new TopicsImpl(root, auth, requestTimeoutMs); + this.localTopicPolicies = new TopicPoliciesImpl(root, auth, requestTimeoutMs, false); + this.globalTopicPolicies = new TopicPoliciesImpl(root, auth, requestTimeoutMs, true); + this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, requestTimeoutMs); + this.resourceQuotas = new ResourceQuotasImpl(root, auth, requestTimeoutMs); + this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs, topics); + this.functions = new FunctionsImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.sources = new SourcesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.sinks = new SinksImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.worker = new WorkerImpl(root, auth, requestTimeoutMs); + this.schemas = new SchemasImpl(root, auth, requestTimeoutMs); + this.bookies = new BookiesImpl(root, auth, requestTimeoutMs); + this.packages = new PackagesImpl(root, auth, asyncHttpConnector.getHttpClient(), requestTimeoutMs); + this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs); if (originalCtxLoader != null) { Thread.currentThread().setContextClassLoader(originalCtxLoader); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java index 99d60bcf259..9548a10f2fc 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java @@ -32,8 +32,8 @@ import org.apache.pulsar.common.policies.data.ResourceGroup; public class ResourceGroupsImpl extends BaseResource implements ResourceGroups { private final WebTarget adminResourceGroups; - public ResourceGroupsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public ResourceGroupsImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminResourceGroups = web.path("/admin/v2/resourcegroups"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java index 7ab917a8532..e48c0b5552a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceQuotasImpl.java @@ -33,8 +33,8 @@ public class ResourceQuotasImpl extends BaseResource implements ResourceQuotas { private final WebTarget adminQuotas; private final WebTarget adminV2Quotas; - public ResourceQuotasImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public ResourceQuotasImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminQuotas = web.path("/admin/resource-quotas"); adminV2Quotas = web.path("/admin/v2/resource-quotas"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java index 65417ec9e72..45093e0481b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java @@ -46,8 +46,8 @@ public class SchemasImpl extends BaseResource implements Schemas { private final WebTarget adminV2; private final WebTarget adminV1; - public SchemasImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public SchemasImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.adminV1 = web.path("/admin/schemas"); this.adminV2 = web.path("/admin/v2/schemas"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index f27071eb72b..79f52f8a669 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -55,8 +55,8 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { private final WebTarget sink; private final AsyncHttpClient asyncHttpClient; - public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) { - super(auth, readTimeoutMs); + public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.sink = web.path("/admin/v3/sink"); this.asyncHttpClient = asyncHttpClient; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index 4996e8b6e94..bfd5c86ac1b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -54,8 +54,8 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { private final WebTarget source; private final AsyncHttpClient asyncHttpClient; - public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long readTimeoutMs) { - super(auth, readTimeoutMs); + public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.source = web.path("/admin/v3/source"); this.asyncHttpClient = asyncHttpClient; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java index 37fd2ba753b..926e6eadaac 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TenantsImpl.java @@ -34,8 +34,8 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; public class TenantsImpl extends BaseResource implements Tenants, Properties { private final WebTarget adminTenants; - public TenantsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public TenantsImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminTenants = web.path("/admin/v2/tenants"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 01ac1c85a89..ce32d1b6ada 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -131,8 +131,8 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys"; // CHECKSTYLE.ON: MemberName - public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public TopicsImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminTopics = web.path("/admin"); adminV2Topics = web.path("/admin/v2"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 6af80344287..b82cc2dfcb1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -43,8 +43,8 @@ import org.apache.pulsar.common.stats.PositionInPendingAckStats; public class TransactionsImpl extends BaseResource implements Transactions { private final WebTarget adminV3Transactions; - public TransactionsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public TransactionsImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); adminV3Transactions = web.path("/admin/v3/transactions"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java index 551ffa29b4d..bebf32b1baa 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java @@ -43,8 +43,8 @@ public class WorkerImpl extends BaseResource implements Worker { private final WebTarget workerStats; private final WebTarget worker; - public WorkerImpl(WebTarget web, Authentication auth, long readTimeoutMs) { - super(auth, readTimeoutMs); + public WorkerImpl(WebTarget web, Authentication auth, long requestTimeoutMs) { + super(auth, requestTimeoutMs); this.worker = web.path("/admin/v2/worker"); this.workerStats = web.path("/admin/v2/worker-stats"); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index 9ce7bb82af7..cfaf4aa5e4d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -35,7 +35,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import java.util.function.Supplier; import javax.net.ssl.SSLContext; import javax.ws.rs.client.Client; @@ -59,6 +58,7 @@ import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; @@ -74,11 +74,11 @@ import org.glassfish.jersey.client.spi.Connector; */ @Slf4j public class AsyncHttpConnector implements Connector { - private static final TimeoutException READ_TIMEOUT_EXCEPTION = - FutureUtil.createTimeoutException("Read timeout", AsyncHttpConnector.class, "retryOrTimeout(...)"); + private static final TimeoutException REQUEST_TIMEOUT_EXCEPTION = + FutureUtil.createTimeoutException("Request timeout", AsyncHttpConnector.class, "retryOrTimeout(...)"); @Getter private final AsyncHttpClient httpClient; - private final Duration readTimeout; + private final Duration requestTimeout; private final int maxRetries; private final PulsarServiceNameResolver serviceNameResolver; private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1, @@ -181,7 +181,7 @@ public class AsyncHttpConnector implements Connector { confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable()); } httpClient = new DefaultAsyncHttpClient(confBuilder.build()); - this.readTimeout = Duration.ofMillis(readTimeoutMs); + this.requestTimeout = requestTimeoutMs > 0 ? Duration.ofMillis(requestTimeoutMs) : null; this.maxRetries = httpClient.getConfig().getMaxRequestRetry(); } @@ -257,9 +257,10 @@ public class AsyncHttpConnector implements Connector { private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) { final CompletableFuture<Response> resultFuture = new CompletableFuture<>(); retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries); - CompletableFuture<Response> timeoutAfter = FutureUtil.createFutureWithTimeout(readTimeout, delayer, - () -> READ_TIMEOUT_EXCEPTION); - return resultFuture.applyToEither(timeoutAfter, Function.identity()); + if (requestTimeout != null) { + FutureUtil.addTimeoutHandling(resultFuture, requestTimeout, delayer, () -> REQUEST_TIMEOUT_EXCEPTION); + } + return resultFuture; } private <T> void retryOperation( @@ -278,11 +279,18 @@ public class AsyncHttpConnector implements Connector { new RetryException("Operation future was cancelled.", throwable)); } else { if (retries > 0) { + if (log.isDebugEnabled()) { + log.debug("Retrying operation. Remaining retries: {}", retries); + } retryOperation( resultFuture, operation, retries - 1); } else { + if (log.isDebugEnabled()) { + log.debug("Number of retries has been exhausted. Failing the operation.", + throwable); + } resultFuture.completeExceptionally( new RetryException("Could not complete the operation. Number of retries " + "has been exhausted. Failed reason: " + throwable.getMessage(), @@ -308,7 +316,7 @@ public class AsyncHttpConnector implements Connector { } } - private CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) { + protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) { ClientRequest currentRequest = new ClientRequest(request); URI newUri = replaceWithNew(host, currentRequest.getUri()); currentRequest.setUri(newUri); @@ -336,7 +344,16 @@ public class AsyncHttpConnector implements Connector { } }); - return builder.execute().toCompletableFuture(); + ListenableFuture<Response> responseFuture = builder.execute(); + CompletableFuture<Response> completableFuture = responseFuture.toCompletableFuture(); + completableFuture.whenComplete((response, throwable) -> { + if (throwable != null && (throwable instanceof CancellationException + || throwable instanceof TimeoutException)) { + // abort the request if the future is cancelled or timed out + responseFuture.abort(throwable); + } + }); + return completableFuture; } @Override diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java new file mode 100644 index 00000000000..aee3ad48cde --- /dev/null +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal.http; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.asynchttpclient.Response; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.ClientResponse; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; +import org.glassfish.jersey.client.spi.AsyncConnectorCallback; +import org.glassfish.jersey.internal.MapPropertiesDelegate; +import org.glassfish.jersey.internal.PropertiesDelegate; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class AsyncHttpConnectorTest { + WireMockServer server; + + @BeforeClass(alwaysRun = true) + void beforeClass() throws IOException { + server = new WireMockServer(WireMockConfiguration.wireMockConfig() + .port(0)); + server.start(); + } + + @AfterClass(alwaysRun = true) + void afterClass() { + if (server != null) { + server.stop(); + } + } + + static class TestClientRequest extends ClientRequest { + public TestClientRequest(URI uri, ClientConfig clientConfig, PropertiesDelegate propertiesDelegate) { + super(uri, clientConfig, propertiesDelegate); + } + } + + @Test + public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, ExecutionException, InterruptedException { + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("once") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("next") + .willReturn(aResponse() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]"))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("once") + .whenScenarioStateIs("next") + .willSetStateTo("retried") + .willReturn(aResponse().withStatus(500))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + + int requestTimeout = 500; + + @Cleanup("shutdownNow") + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + Executor delayedExecutor = runnable -> { + scheduledExecutor.schedule(runnable, requestTimeout, TimeUnit.MILLISECONDS); + }; + @Cleanup + AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout, + requestTimeout, 0, conf) { + @Override + protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) { + // delay the response to simulate a timeout + return super.oneShot(host, request) + .thenApplyAsync(response -> { + return response; + }, delayedExecutor); + } + }; + + JerseyClient jerseyClient = JerseyClientBuilder.createClient(); + ClientConfig clientConfig = jerseyClient.getConfiguration(); + PropertiesDelegate propertiesDelegate = new MapPropertiesDelegate(); + URI requestUri = URI.create("http://localhost:" + server.port() + "/admin/v2/clusters"); + ClientRequest request = new TestClientRequest(requestUri, clientConfig, propertiesDelegate); + request.setMethod("GET"); + CompletableFuture<ClientResponse> future = new CompletableFuture<>(); + connector.apply(request, new AsyncConnectorCallback() { + @Override + public void response(ClientResponse response) { + future.complete(response); + } + + @Override + public void failure(Throwable failure) { + future.completeExceptionally(failure); + } + }); + Thread.sleep(2 * requestTimeout); + String scenarioState = + server.getAllScenarios().getScenarios().stream().filter(scenario -> "once".equals(scenario.getName())) + .findFirst().get().getState(); + assertEquals(scenarioState, "next"); + assertTrue(future.isCompletedExceptionally()); + } +} \ No newline at end of file diff --git a/pulsar-client-admin/src/test/resources/log4j2.xml b/pulsar-client-admin/src/test/resources/log4j2.xml new file mode 100644 index 00000000000..9b57b450ffa --- /dev/null +++ b/pulsar-client-admin/src/test/resources/log4j2.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<Configuration xmlns="http://logging.apache.org/log4j/2.0/config" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd"> + <Appenders> + <!-- setting follow="true" is required for using ConsoleCaptor to validate log messages --> + <Console name="CONSOLE" target="SYSTEM_OUT" follow="true"> + <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%c{1}] - %m%n"/> + </Console> + </Appenders> + <Loggers> +<!-- <Logger name="org.apache.pulsar.broker.service.persistent.PersistentTopic" level="DEBUG" additivity="false">--> +<!-- <AppenderRef ref="CONSOLE" />--> +<!-- </Logger>--> + + <Root level="INFO"> + <AppenderRef ref="CONSOLE"/> + </Root> + <Logger name="org.apache.pulsar.client.admin" level="DEBUG" /> + </Loggers> +</Configuration> diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java index 9f2a97a4156..3659ade9fca 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java @@ -44,10 +44,10 @@ public abstract class CmdBase { private IUsageFormatter usageFormatter; /** - * Default read timeout in milliseconds. - * Used if not found from configuration data in {@link #getReadTimeoutMs()} + * Default request timeout in milliseconds. + * Used if not found from configuration data in {@link #getRequestTimeoutMs()} */ - private static final long DEFAULT_READ_TIMEOUT_MILLIS = 60000; + private static final long DEFAULT_REQUEST_TIMEOUT_MILLIS = 60000; @Parameter(names = { "--help", "-h" }, help = true, hidden = true) private boolean help = false; @@ -130,17 +130,17 @@ public abstract class CmdBase { return admin; } - protected long getReadTimeoutMs() { + protected long getRequestTimeoutMs() { PulsarAdmin pulsarAdmin = getAdmin(); if (pulsarAdmin instanceof PulsarAdminImpl) { - return ((PulsarAdminImpl) pulsarAdmin).getClientConfigData().getReadTimeoutMs(); + return ((PulsarAdminImpl) pulsarAdmin).getClientConfigData().getRequestTimeoutMs(); } - return DEFAULT_READ_TIMEOUT_MILLIS; + return DEFAULT_REQUEST_TIMEOUT_MILLIS; } protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException { try { - return executor.get().get(getReadTimeoutMs(), TimeUnit.MILLISECONDS); + return executor.get().get(getRequestTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new PulsarAdminException(e);
