This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6d140af3fdb5edec016eca8f361d552e108d7e45
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Aug 8 15:04:04 2024 +0300

    [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to 
PulsarAdminBuilder (#22541)
    
    (cherry picked from commit 3e7dbb4957bf5daae59127cd66e4da3802072853)
    (cherry picked from commit 4c480fd4ec16af1ffeac5898b60415a503f6ed02)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |  20 +-
 distribution/shell/src/assemble/LICENSE.bin.txt    |  13 +-
 pom.xml                                            |   9 +-
 .../pulsar/client/admin/PulsarAdminBuilder.java    |  26 ++
 pulsar-client-admin-shaded/pom.xml                 |   5 +
 .../client/admin/internal/FunctionsImpl.java       |  70 ++--
 .../pulsar/client/admin/internal/PackagesImpl.java |  68 ++--
 .../admin/internal/PulsarAdminBuilderImpl.java     |  24 ++
 .../client/admin/internal/PulsarAdminImpl.java     |   8 +-
 .../pulsar/client/admin/internal/SinksImpl.java    |  13 +-
 .../pulsar/client/admin/internal/SourcesImpl.java  |  13 +-
 .../admin/internal/http/AsyncHttpConnector.java    | 351 +++++++++++++++------
 .../internal/http/AsyncHttpRequestExecutor.java    |  48 +++
 .../admin/internal/PulsarAdminBuilderImplTest.java |   2 +
 .../internal/http/AsyncHttpConnectorTest.java      | 200 ++++++++++++
 pulsar-client-all/pom.xml                          |   5 +
 .../apache/pulsar/client/api/ClientBuilder.java    |   2 +
 pulsar-client-shaded/pom.xml                       |   5 +
 .../apache/pulsar/client/impl/ConnectionPool.java  |   2 +-
 .../client/impl/conf/ClientConfigurationData.java  |   4 +-
 .../pulsar/client/impl/ClientBuilderImplTest.java  |   2 +-
 pulsar-common/pom.xml                              |   5 +
 pulsar-sql/presto-distribution/LICENSE             |  20 +-
 pulsar-sql/presto-distribution/pom.xml             |   2 +-
 24 files changed, 687 insertions(+), 230 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index b5764d7b87e..c7953eb99f7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -554,6 +554,8 @@ The Apache Software License, Version 2.0
     - com.rabbitmq-amqp-client-5.5.3.jar
   * RoaringBitmap
     - org.roaringbitmap-RoaringBitmap-0.9.44.jar
+  * Spotify completable-futures
+    - com.spotify-completable-futures-0.3.6.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
@@ -595,15 +597,15 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
     - org.glassfish.hk2-osgi-resource-locator-1.0.3.jar
     - org.glassfish.hk2.external-aopalliance-repackaged-2.6.1.jar
  * Jersey
-    - org.glassfish.jersey.containers-jersey-container-servlet-2.41.jar
-    - org.glassfish.jersey.containers-jersey-container-servlet-core-2.41.jar
-    - org.glassfish.jersey.core-jersey-client-2.41.jar
-    - org.glassfish.jersey.core-jersey-common-2.41.jar
-    - org.glassfish.jersey.core-jersey-server-2.41.jar
-    - org.glassfish.jersey.ext-jersey-entity-filtering-2.41.jar
-    - org.glassfish.jersey.media-jersey-media-json-jackson-2.41.jar
-    - org.glassfish.jersey.media-jersey-media-multipart-2.41.jar
-    - org.glassfish.jersey.inject-jersey-hk2-2.41.jar
+    - org.glassfish.jersey.containers-jersey-container-servlet-2.42.jar
+    - org.glassfish.jersey.containers-jersey-container-servlet-core-2.42.jar
+    - org.glassfish.jersey.core-jersey-client-2.42.jar
+    - org.glassfish.jersey.core-jersey-common-2.42.jar
+    - org.glassfish.jersey.core-jersey-server-2.42.jar
+    - org.glassfish.jersey.ext-jersey-entity-filtering-2.42.jar
+    - org.glassfish.jersey.media-jersey-media-json-jackson-2.42.jar
+    - org.glassfish.jersey.media-jersey-media-multipart-2.42.jar
+    - org.glassfish.jersey.inject-jersey-hk2-2.42.jar
  * Mimepull -- org.jvnet.mimepull-mimepull-1.9.15.jar
 
 Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 24a57ecd2b7..9104aa9b90b 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -404,6 +404,7 @@ The Apache Software License, Version 2.0
   * Apache Avro
     - avro-1.11.3.jar
     - avro-protobuf-1.11.3.jar
+ * Spotify completable-futures -- completable-futures-0.3.6.jar
 
 BSD 3-clause "New" or "Revised" License
  * JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
@@ -429,12 +430,12 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
     - aopalliance-repackaged-2.6.1.jar
     - osgi-resource-locator-1.0.3.jar
  * Jersey
-    - jersey-client-2.41.jar
-    - jersey-common-2.41.jar
-    - jersey-entity-filtering-2.41.jar
-    - jersey-media-json-jackson-2.41.jar
-    - jersey-media-multipart-2.41.jar
-    - jersey-hk2-2.41.jar
+    - jersey-client-2.42.jar
+    - jersey-common-2.42.jar
+    - jersey-entity-filtering-2.42.jar
+    - jersey-media-json-jackson-2.42.jar
+    - jersey-media-multipart-2.42.jar
+    - jersey-hk2-2.42.jar
  * Mimepull -- mimepull-1.9.15.jar
 
 Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
diff --git a/pom.xml b/pom.xml
index 0c57b6de246..6dd5459a206 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <netty.version>4.1.100.Final</netty.version>
     <jetty.version>9.4.54.v20240208</jetty.version>
     <conscrypt.version>2.5.2</conscrypt.version>
-    <jersey.version>2.41</jersey.version>
+    <jersey.version>2.42</jersey.version>
     <athenz.version>1.10.50</athenz.version>
     <prometheus.version>0.16.0</prometheus.version>
     <vertx.version>3.9.8</vertx.version>
@@ -234,6 +234,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <disruptor.version>3.4.3</disruptor.version>
     <zstd-jni.version>1.5.2-3</zstd-jni.version>
     <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
+    <completable-futures.version>0.3.6</completable-futures.version>
 
     <!-- test dependencies -->
     <testcontainers.version>1.17.6</testcontainers.version>
@@ -597,6 +598,12 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${bookkeeper.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>com.spotify</groupId>
+        <artifactId>completable-futures</artifactId>
+        <version>${completable-futures.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.rocksdb</groupId>
         <artifactId>rocksdbjni</artifactId>
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
index 60d4c2dbc71..a23dee741fd 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java
@@ -327,4 +327,30 @@ public interface PulsarAdminBuilder {
      */
     PulsarAdminBuilder setContextClassLoader(ClassLoader 
clientBuilderClassLoader);
 
+
+    /**
+     * Configures the maximum number of connections that the client library 
will establish with a single host.
+     * <p>
+     * By default, the connection pool maintains up to 16 connections to a 
single host. This method allows you to
+     * modify this default behavior and limit the number of connections.
+     * <p>
+     * This setting can be useful in scenarios where you want to limit the 
resources used by the client library,
+     * or control the level of parallelism for operations so that a single 
client does not overwhelm
+     * the Pulsar cluster with too many concurrent connections.
+     *
+     * @param maxConnectionsPerHost the maximum number of connections to 
establish per host. Set to <= 0 to disable
+     *                             the limit.
+     * @return the PulsarAdminBuilder instance, allowing for method chaining
+     */
+    PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost);
+
+    /**
+     * Sets the maximum idle time for a pooled connection. If a connection is 
idle for more than the specified
+     * amount of seconds, it will be released back to the connection pool.
+     * Defaults to 25 seconds.
+     *
+     * @param connectionMaxIdleSeconds the maximum idle time, in seconds, for 
a pooled connection
+     * @return the PulsarAdminBuilder instance
+     */
+    PulsarAdminBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
 }
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index f259b6076b8..a8487109886 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -123,6 +123,7 @@
                   <include>com.google.protobuf:protobuf-java</include>
                   <include>com.google.guava:guava</include>
                   <include>com.google.code.gson:gson</include>
+                  <include>com.spotify:completable-futures</include>
                   <include>com.fasterxml.jackson.core</include>
                   <include>io.netty:*</include>
                   <include>org.apache.pulsar:pulsar-common</include>
@@ -188,6 +189,10 @@
                     <exclude>com.google.protobuf.*</exclude>
                   </excludes>
                 </relocation>
+                <relocation>
+                  <pattern>com.spotify.futures</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>com.fasterxml.jackson</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 750f642f365..e449c719503 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -22,7 +22,6 @@ import static org.asynchttpclient.Dsl.get;
 import static org.asynchttpclient.Dsl.post;
 import static org.asynchttpclient.Dsl.put;
 import com.google.gson.Gson;
-import io.netty.handler.codec.http.HttpHeaders;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -41,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
@@ -54,10 +54,8 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.asynchttpclient.AsyncHandler;
-import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncCompletionHandlerBase;
 import org.asynchttpclient.HttpResponseBodyPart;
-import org.asynchttpclient.HttpResponseStatus;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.ByteArrayPart;
 import org.asynchttpclient.request.body.multipart.FilePart;
@@ -70,12 +68,14 @@ import 
org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 public class FunctionsImpl extends ComponentResource implements Functions {
 
     private final WebTarget functions;
-    private final AsyncHttpClient asyncHttpClient;
+    private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
 
-    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+    public FunctionsImpl(WebTarget web, Authentication auth,
+                         AsyncHttpRequestExecutor asyncHttpRequestExecutor,
+                         long requestTimeoutMs) {
         super(auth, requestTimeoutMs);
         this.functions = web.path("/admin/v3/functions");
-        this.asyncHttpClient = asyncHttpClient;
+        this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
     }
 
     @Override
@@ -277,8 +277,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 // If the function code is built in, we don't need to submit 
here
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
-            asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build())
-                    .toCompletableFuture()
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build())
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
                             future.completeExceptionally(
@@ -369,8 +368,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
 
-            asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build())
-                    .toCompletableFuture()
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build())
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
                             future.completeExceptionally(
@@ -570,7 +568,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                     .addBodyPart(new FilePart("data", new File(sourceFile), 
MediaType.APPLICATION_OCTET_STREAM))
                     .addBodyPart(new StringPart("path", path, 
MediaType.TEXT_PLAIN));
 
-            asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build()).toCompletableFuture()
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build())
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
                             future.completeExceptionally(
@@ -633,55 +631,31 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
 
             RequestBuilder builder = get(target.getUri().toASCIIString());
 
-            CompletableFuture<HttpResponseStatus> statusFuture =
-                    asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build(),
-                        new AsyncHandler<HttpResponseStatus>() {
-                            private HttpResponseStatus status;
-
-                            @Override
-                            public State onStatusReceived(HttpResponseStatus 
responseStatus) throws Exception {
-                                status = responseStatus;
-                                if (status.getStatusCode() != 
Response.Status.OK.getStatusCode()) {
-                                    return State.ABORT;
-                                }
-                                return State.CONTINUE;
-                            }
-
-                            @Override
-                            public State onHeadersReceived(HttpHeaders 
headers) throws Exception {
-                                return State.CONTINUE;
-                            }
+            CompletableFuture<org.asynchttpclient.Response> responseFuture =
+                    
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build(),
+                            () -> new AsyncCompletionHandlerBase() {
 
                             @Override
                             public State 
onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
                                 os.write(bodyPart.getBodyByteBuffer());
                                 return State.CONTINUE;
                             }
+                        });
 
-                            @Override
-                            public HttpResponseStatus onCompleted() throws 
Exception {
-                                return status;
-                            }
-
-                            @Override
-                            public void onThrowable(Throwable t) {
-                            }
-                        }).toCompletableFuture();
-
-            statusFuture
-                    .whenComplete((status, throwable) -> {
+            responseFuture
+                    .whenComplete((response, throwable) -> {
                         try {
                             os.close();
                         } catch (IOException e) {
                             future.completeExceptionally(getApiException(e));
                         }
                     })
-                    .thenAccept(status -> {
-                        if (status.getStatusCode() < 200 || 
status.getStatusCode() >= 300) {
+                    .thenAccept(response -> {
+                        if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
                             future.completeExceptionally(
                                     getApiException(Response
-                                            .status(status.getStatusCode())
-                                            .entity(status.getStatusText())
+                                            .status(response.getStatusCode())
+                                            .entity(response.getStatusText())
                                             .build()));
                         } else {
                             future.complete(null);
@@ -778,7 +752,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                             
.path("state").path(state.getKey()).getUri().toASCIIString());
             builder.addBodyPart(new StringPart("state", 
ObjectMapperFactory.getThreadLocal()
                     .writeValueAsString(state), MediaType.APPLICATION_JSON));
-            asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
@@ -818,7 +792,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                             .addBodyPart(new ByteArrayPart("functionMetaData", 
functionMetaData))
                     .addBodyPart(new StringPart("delete", 
Boolean.toString(delete)));
 
-            asyncHttpClient.executeRequest(addAuthHeaders(functions, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(functions, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index c2b0f6b7be9..9a191c92bd2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.admin.internal;
 
 import static org.asynchttpclient.Dsl.get;
 import com.google.gson.Gson;
-import io.netty.handler.codec.http.HttpHeaders;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -36,15 +35,14 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.client.admin.Packages;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.packages.management.core.common.PackageName;
-import org.asynchttpclient.AsyncHandler;
-import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncCompletionHandlerBase;
 import org.asynchttpclient.Dsl;
 import org.asynchttpclient.HttpResponseBodyPart;
-import org.asynchttpclient.HttpResponseStatus;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
@@ -55,11 +53,12 @@ import 
org.asynchttpclient.request.body.multipart.StringPart;
 public class PackagesImpl extends ComponentResource implements Packages {
 
     private final WebTarget packages;
-    private final AsyncHttpClient httpClient;
+    private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
 
-    public PackagesImpl(WebTarget webTarget, Authentication auth, 
AsyncHttpClient client, long requestTimeoutMs) {
+    public PackagesImpl(WebTarget webTarget, Authentication auth, 
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
+                        long requestTimeoutMs) {
         super(auth, requestTimeoutMs);
-        this.httpClient = client;
+        this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
         this.packages = webTarget.path("/admin/v3/packages");
     }
 
@@ -98,7 +97,7 @@ public class PackagesImpl extends ComponentResource 
implements Packages {
                 
.post(packages.path(PackageName.get(packageName).toRestPath()).getUri().toASCIIString())
                 .addBodyPart(new FilePart("file", new File(path), 
MediaType.APPLICATION_OCTET_STREAM))
                 .addBodyPart(new StringPart("metadata", new 
Gson().toJson(metadata), MediaType.APPLICATION_JSON));
-            httpClient.executeRequest(addAuthHeaders(packages, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(packages, 
builder).build())
                 .toCompletableFuture()
                 .thenAccept(response -> {
                     if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
@@ -138,55 +137,30 @@ public class PackagesImpl extends ComponentResource 
implements Packages {
             FileChannel os = new 
FileOutputStream(destinyPath.toFile()).getChannel();
             RequestBuilder builder = get(webTarget.getUri().toASCIIString());
 
-            CompletableFuture<HttpResponseStatus> statusFuture =
-                httpClient.executeRequest(addAuthHeaders(webTarget, 
builder).build(),
-                    new AsyncHandler<HttpResponseStatus>() {
-                        private HttpResponseStatus status;
+            CompletableFuture<org.asynchttpclient.Response> responseFuture =
+                
asyncHttpRequestExecutor.executeRequest(addAuthHeaders(webTarget, 
builder).build(),
+                        () -> new AsyncCompletionHandlerBase() {
 
-                        @Override
-                        public State onStatusReceived(HttpResponseStatus 
httpResponseStatus) throws Exception {
-                            status = httpResponseStatus;
-                            if (status.getStatusCode() != 
Response.Status.OK.getStatusCode()) {
-                                return State.ABORT;
+                            @Override
+                            public State 
onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+                                os.write(bodyPart.getBodyByteBuffer());
+                                return State.CONTINUE;
                             }
-                            return State.CONTINUE;
-                        }
-
-                        @Override
-                        public State onHeadersReceived(HttpHeaders 
httpHeaders) throws Exception {
-                            return State.CONTINUE;
-                        }
-
-                        @Override
-                        public State onBodyPartReceived(HttpResponseBodyPart 
httpResponseBodyPart) throws Exception {
-                            os.write(httpResponseBodyPart.getBodyByteBuffer());
-                            return State.CONTINUE;
-                        }
-
-                        @Override
-                        public void onThrowable(Throwable throwable) {
-                            // we don't need to handle that throwable and use 
the returned future to handle it.
-                        }
-
-                        @Override
-                        public HttpResponseStatus onCompleted() throws 
Exception {
-                            return status;
-                        }
-                    }).toCompletableFuture();
-            statusFuture
-                .whenComplete((status, throwable) -> {
+                    });
+            responseFuture
+                .whenComplete((response, throwable) -> {
                     try {
                         os.close();
                     } catch (IOException e) {
                         
future.completeExceptionally(getApiException(throwable));
                     }
                 })
-                .thenAccept(status -> {
-                    if (status.getStatusCode() < 200 || status.getStatusCode() 
>= 300) {
+                .thenAccept(response -> {
+                    if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
                         future.completeExceptionally(
                             getApiException(Response
-                                .status(status.getStatusCode())
-                                .entity(status.getStatusText())
+                                .status(response.getStatusCode())
+                                .entity(response.getStatusText())
                                 .build()));
                     } else {
                         future.complete(null);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 1bcfadc8474..ec57112d762 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -46,6 +46,7 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
 
     public PulsarAdminBuilderImpl() {
         this.conf = new ClientConfigurationData();
+        this.conf.setConnectionsPerBroker(16);
     }
 
     private PulsarAdminBuilderImpl(ClientConfigurationData conf) {
@@ -61,6 +62,15 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
     public PulsarAdminBuilder loadConf(Map<String, Object> config) {
         conf = ConfigurationDataUtils.loadData(config, conf, 
ClientConfigurationData.class);
         setAuthenticationFromPropsIfAvailable(conf);
+        // in ClientConfigurationData, the maxConnectionsPerHost maps to 
connectionsPerBroker
+        if (config.containsKey("maxConnectionsPerHost")) {
+            Object maxConnectionsPerHostObj = 
config.get("maxConnectionsPerHost");
+            if (maxConnectionsPerHostObj instanceof Integer) {
+                maxConnectionsPerHost((Integer) maxConnectionsPerHostObj);
+            } else {
+                
maxConnectionsPerHost(Integer.parseInt(maxConnectionsPerHostObj.toString()));
+            }
+        }
         return this;
     }
 
@@ -227,4 +237,18 @@ public class PulsarAdminBuilderImpl implements 
PulsarAdminBuilder {
         this.clientBuilderClassLoader = clientBuilderClassLoader;
         return this;
     }
+
+    @Override
+    public PulsarAdminBuilder maxConnectionsPerHost(int maxConnectionsPerHost) 
{
+        // reuse the same configuration as the client, however for the admin 
client, the connection
+        // is usually established to a cluster address and not to a broker 
address
+        this.conf.setConnectionsPerBroker(maxConnectionsPerHost);
+        return this;
+    }
+
+    @Override
+    public PulsarAdminBuilder connectionMaxIdleSeconds(int 
connectionMaxIdleSeconds) {
+        this.conf.setConnectionMaxIdleSeconds(connectionMaxIdleSeconds);
+        return this;
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index d78394ef936..f944289d8d8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -168,13 +168,13 @@ public class PulsarAdminImpl implements PulsarAdmin {
         this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth, 
requestTimeoutMs);
         this.resourceQuotas = new ResourceQuotasImpl(root, auth, 
requestTimeoutMs);
         this.lookups = new LookupImpl(root, auth, useTls, requestTimeoutMs, 
topics);
-        this.functions = new FunctionsImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
-        this.sources = new SourcesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
-        this.sinks = new SinksImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.functions = new FunctionsImpl(root, auth, asyncHttpConnector, 
requestTimeoutMs);
+        this.sources = new SourcesImpl(root, auth, asyncHttpConnector, 
requestTimeoutMs);
+        this.sinks = new SinksImpl(root, auth, asyncHttpConnector, 
requestTimeoutMs);
         this.worker = new WorkerImpl(root, auth, requestTimeoutMs);
         this.schemas = new SchemasImpl(root, auth, requestTimeoutMs);
         this.bookies = new BookiesImpl(root, auth, requestTimeoutMs);
-        this.packages = new PackagesImpl(root, auth, 
asyncHttpConnector.getHttpClient(), requestTimeoutMs);
+        this.packages = new PackagesImpl(root, auth, asyncHttpConnector, 
requestTimeoutMs);
         this.transactions = new TransactionsImpl(root, auth, requestTimeoutMs);
 
         if (originalCtxLoader != null) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 79f52f8a669..59d5e1f3736 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Sink;
 import org.apache.pulsar.client.admin.Sinks;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.UpdateOptionsImpl;
@@ -42,7 +43,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
@@ -53,12 +53,13 @@ import 
org.glassfish.jersey.media.multipart.FormDataMultiPart;
 public class SinksImpl extends ComponentResource implements Sinks, Sink {
 
     private final WebTarget sink;
-    private final AsyncHttpClient asyncHttpClient;
+    private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
 
-    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+    public SinksImpl(WebTarget web, Authentication auth, 
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
+                     long requestTimeoutMs) {
         super(auth, requestTimeoutMs);
         this.sink = web.path("/admin/v3/sink");
-        this.asyncHttpClient = asyncHttpClient;
+        this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
     }
 
     @Override
@@ -212,7 +213,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 // If the function code is built in, we don't need to submit 
here
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
-            asyncHttpClient.executeRequest(addAuthHeaders(sink, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
@@ -301,7 +302,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 // If the function code is built in, we don't need to submit 
here
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
-            asyncHttpClient.executeRequest(addAuthHeaders(sink, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(sink, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index bfd5c86ac1b..2dc27c829c8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Source;
 import org.apache.pulsar.client.admin.Sources;
+import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.functions.UpdateOptionsImpl;
@@ -41,7 +42,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
@@ -52,12 +52,13 @@ import 
org.glassfish.jersey.media.multipart.FormDataMultiPart;
 public class SourcesImpl extends ComponentResource implements Sources, Source {
 
     private final WebTarget source;
-    private final AsyncHttpClient asyncHttpClient;
+    private final AsyncHttpRequestExecutor asyncHttpRequestExecutor;
 
-    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient, long requestTimeoutMs) {
+    public SourcesImpl(WebTarget web, Authentication auth, 
AsyncHttpRequestExecutor asyncHttpRequestExecutor,
+                       long requestTimeoutMs) {
         super(auth, requestTimeoutMs);
         this.source = web.path("/admin/v3/source");
-        this.asyncHttpClient = asyncHttpClient;
+        this.asyncHttpRequestExecutor = asyncHttpRequestExecutor;
     }
 
     @Override
@@ -196,7 +197,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 // If the function code is built in, we don't need to submit 
here
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
-            asyncHttpClient.executeRequest(addAuthHeaders(source, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
@@ -274,7 +275,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 // If the function code is built in, we don't need to submit 
here
                 builder.addBodyPart(new FilePart("data", new File(fileName), 
MediaType.APPLICATION_OCTET_STREAM));
             }
-            asyncHttpClient.executeRequest(addAuthHeaders(source, 
builder).build())
+            asyncHttpRequestExecutor.executeRequest(addAuthHeaders(source, 
builder).build())
                     .toCompletableFuture()
                     .thenAccept(response -> {
                         if (response.getStatusCode() < 200 || 
response.getStatusCode() >= 300) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index cfaf4aa5e4d..751110798e2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -18,6 +18,17 @@
  */
 package org.apache.pulsar.client.admin.internal.http;
 
+import static org.asynchttpclient.util.HttpConstants.Methods.GET;
+import static org.asynchttpclient.util.HttpConstants.Methods.HEAD;
+import static org.asynchttpclient.util.HttpConstants.Methods.OPTIONS;
+import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.FOUND_302;
+import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.MOVED_PERMANENTLY_301;
+import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.PERMANENT_REDIRECT_308;
+import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTHER_303;
+import static 
org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307;
+import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
+import com.spotify.futures.ConcurrencyReducer;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.ssl.SslContext;
@@ -27,9 +38,12 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.security.GeneralSecurityException;
 import java.time.Duration;
+import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,32 +51,39 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLContext;
+import javax.ws.rs.ProcessingException;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
+import org.apache.pulsar.client.impl.ServiceNameResolver;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.util.WithSNISslEngineFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
+import org.asynchttpclient.AsyncCompletionHandlerBase;
+import org.asynchttpclient.AsyncHandler;
 import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
 import org.asynchttpclient.BoundRequestBuilder;
 import org.asynchttpclient.DefaultAsyncHttpClient;
 import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.ListenableFuture;
 import org.asynchttpclient.Request;
 import org.asynchttpclient.Response;
+import org.asynchttpclient.SslEngineFactory;
 import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
 import org.asynchttpclient.netty.ssl.JsseSslEngineFactory;
+import org.asynchttpclient.uri.Uri;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.client.ClientRequest;
 import org.glassfish.jersey.client.ClientResponse;
@@ -73,16 +94,18 @@ import org.glassfish.jersey.client.spi.Connector;
  * Customized Jersey client connector with multi-host support.
  */
 @Slf4j
-public class AsyncHttpConnector implements Connector {
+public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor 
{
     private static final TimeoutException REQUEST_TIMEOUT_EXCEPTION =
             FutureUtil.createTimeoutException("Request timeout", 
AsyncHttpConnector.class, "retryOrTimeout(...)");
+    private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HOST = 10000;
     @Getter
     private final AsyncHttpClient httpClient;
     private final Duration requestTimeout;
     private final int maxRetries;
-    private final PulsarServiceNameResolver serviceNameResolver;
+    private final ServiceNameResolver serviceNameResolver;
     private final ScheduledExecutorService delayer = 
Executors.newScheduledThreadPool(1,
             new DefaultThreadFactory("delayer"));
+    private final Map<String, ConcurrencyReducer<Response>> 
concurrencyReducers = new ConcurrentHashMap<>();
 
     public AsyncHttpConnector(Client client, ClientConfigurationData conf, int 
autoCertRefreshTimeSeconds) {
         this((int) 
client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
@@ -96,9 +119,46 @@ public class AsyncHttpConnector implements Connector {
     public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
                               int requestTimeoutMs,
                               int autoCertRefreshTimeSeconds, 
ClientConfigurationData conf) {
+        Validate.notEmpty(conf.getServiceUrl(), "Service URL is not provided");
+        serviceNameResolver = new PulsarServiceNameResolver();
+        String serviceUrl = conf.getServiceUrl();
+        serviceNameResolver.updateServiceUrl(serviceUrl);
+        AsyncHttpClientConfig asyncHttpClientConfig =
+                createAsyncHttpClientConfig(conf, connectTimeoutMs, 
readTimeoutMs, requestTimeoutMs,
+                        autoCertRefreshTimeSeconds);
+        httpClient = createAsyncHttpClient(asyncHttpClientConfig);
+        this.requestTimeout = requestTimeoutMs > 0 ? 
Duration.ofMillis(requestTimeoutMs) : null;
+        this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
+    }
+
+    private AsyncHttpClientConfig 
createAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs,
+                                                              int 
readTimeoutMs,
+                                                              int 
requestTimeoutMs, int autoCertRefreshTimeSeconds)
+            throws GeneralSecurityException, IOException {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
+        configureAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, 
requestTimeoutMs, confBuilder);
+        if (conf.getServiceUrl().startsWith("https://";)) {
+            configureAsyncHttpClientSslEngineFactory(conf, 
autoCertRefreshTimeSeconds, confBuilder);
+        }
+        AsyncHttpClientConfig asyncHttpClientConfig = confBuilder.build();
+        return asyncHttpClientConfig;
+    }
+
+    private void configureAsyncHttpClientConfig(ClientConfigurationData conf, 
int connectTimeoutMs, int readTimeoutMs,
+                                                int requestTimeoutMs,
+                                                
DefaultAsyncHttpClientConfig.Builder confBuilder) {
+        if (conf.getConnectionsPerBroker() > 0) {
+            
confBuilder.setMaxConnectionsPerHost(conf.getConnectionsPerBroker());
+            // Use the request timeout value for acquireFreeChannelTimeout so 
that we don't need to add
+            // yet another configuration property. When the ConcurrencyReducer 
is in use, it shouldn't be necessary to
+            // wait for a free channel since the ConcurrencyReducer will queue 
the requests.
+            
confBuilder.setAcquireFreeChannelTimeout(conf.getRequestTimeoutMs());
+        }
+        if (conf.getConnectionMaxIdleSeconds() > 0) {
+            
confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 
1000);
+        }
         confBuilder.setUseProxyProperties(true);
-        confBuilder.setFollowRedirect(true);
+        confBuilder.setFollowRedirect(false);
         confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
         confBuilder.setConnectTimeout(connectTimeoutMs);
         confBuilder.setReadTimeout(readTimeoutMs);
@@ -114,75 +174,75 @@ public class AsyncHttpConnector implements Connector {
                        && super.keepAlive(remoteAddress, ahcRequest, request, 
response);
             }
         });
+        
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
+    }
 
-        serviceNameResolver = new PulsarServiceNameResolver();
-        if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) {
-            serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
-            if (conf.getServiceUrl().startsWith("https://";)) {
-                // Set client key and certificate if available
-                AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
-
-                if (conf.isUseKeyStoreTls()) {
-                    KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() :
-                            new KeyStoreParams(conf.getTlsKeyStoreType(), 
conf.getTlsKeyStorePath(),
-                                    conf.getTlsKeyStorePassword());
-
-                    final SSLContext sslCtx = 
KeyStoreSSLContext.createClientSslContext(
-                            conf.getSslProvider(),
-                            params.getKeyStoreType(),
-                            params.getKeyStorePath(),
-                            params.getKeyStorePassword(),
-                            conf.isTlsAllowInsecureConnection(),
-                            conf.getTlsTrustStoreType(),
-                            conf.getTlsTrustStorePath(),
-                            conf.getTlsTrustStorePassword(),
-                            conf.getTlsCiphers(),
-                            conf.getTlsProtocols());
-
-                    JsseSslEngineFactory sslEngineFactory = new 
JsseSslEngineFactory(sslCtx);
-                    confBuilder.setSslEngineFactory(sslEngineFactory);
-                } else {
-                    SslProvider sslProvider = null;
-                    if (conf.getSslProvider() != null) {
-                        sslProvider = 
SslProvider.valueOf(conf.getSslProvider());
-                    }
-                    SslContext sslCtx = null;
-                    if (authData.hasDataForTls()) {
-                        sslCtx = authData.getTlsTrustStoreStream() == null
-                                ? 
SecurityUtility.createAutoRefreshSslContextForClient(
-                                sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(), 
authData.getTlsCerificateFilePath(),
-                                authData.getTlsPrivateKeyFilePath(), null, 
autoCertRefreshTimeSeconds, delayer)
-                                : 
SecurityUtility.createNettySslContextForClient(
-                                sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                authData.getTlsTrustStoreStream(), 
authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey(),
-                                conf.getTlsCiphers(),
-                                conf.getTlsProtocols());
-                    } else {
-                        sslCtx = 
SecurityUtility.createNettySslContextForClient(
-                                sslProvider,
-                                conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(),
-                                conf.getTlsCertificateFilePath(),
-                                conf.getTlsKeyFilePath(),
-                                conf.getTlsCiphers(),
-                                conf.getTlsProtocols());
-                    }
-                    confBuilder.setSslContext(sslCtx);
-                    if (!conf.isTlsHostnameVerificationEnable()) {
-                        confBuilder.setSslEngineFactory(new 
WithSNISslEngineFactory(serviceNameResolver
-                                .resolveHostUri().getHost()));
-                    }
-                }
+    protected AsyncHttpClient createAsyncHttpClient(AsyncHttpClientConfig 
asyncHttpClientConfig) {
+        return new DefaultAsyncHttpClient(asyncHttpClientConfig);
+    }
+
+    private void 
configureAsyncHttpClientSslEngineFactory(ClientConfigurationData conf, int 
autoCertRefreshTimeSeconds,
+                                                          
DefaultAsyncHttpClientConfig.Builder confBuilder)
+            throws GeneralSecurityException, IOException {
+        // Set client key and certificate if available
+        AuthenticationDataProvider authData = 
conf.getAuthentication().getAuthData();
+
+        SslEngineFactory sslEngineFactory = null;
+        if (conf.isUseKeyStoreTls()) {
+            KeyStoreParams params = authData.hasDataForTls() ? 
authData.getTlsKeyStoreParams() :
+                    new KeyStoreParams(conf.getTlsKeyStoreType(), 
conf.getTlsKeyStorePath(),
+                            conf.getTlsKeyStorePassword());
+
+            final SSLContext sslCtx = 
KeyStoreSSLContext.createClientSslContext(
+                    conf.getSslProvider(),
+                    params.getKeyStoreType(),
+                    params.getKeyStorePath(),
+                    params.getKeyStorePassword(),
+                    conf.isTlsAllowInsecureConnection(),
+                    conf.getTlsTrustStoreType(),
+                    conf.getTlsTrustStorePath(),
+                    conf.getTlsTrustStorePassword(),
+                    conf.getTlsCiphers(),
+                    conf.getTlsProtocols());
+
+            sslEngineFactory = new JsseSslEngineFactory(sslCtx);
+            confBuilder.setSslEngineFactory(sslEngineFactory);
+        } else {
+            SslProvider sslProvider = null;
+            if (conf.getSslProvider() != null) {
+                sslProvider = SslProvider.valueOf(conf.getSslProvider());
+            }
+            SslContext sslCtx = null;
+            if (authData.hasDataForTls()) {
+                sslCtx = authData.getTlsTrustStoreStream() == null
+                        ? SecurityUtility.createAutoRefreshSslContextForClient(
+                        sslProvider,
+                        conf.isTlsAllowInsecureConnection(),
+                        conf.getTlsTrustCertsFilePath(), 
authData.getTlsCerificateFilePath(),
+                        authData.getTlsPrivateKeyFilePath(), null, 
autoCertRefreshTimeSeconds, delayer)
+                        : SecurityUtility.createNettySslContextForClient(
+                        sslProvider,
+                        conf.isTlsAllowInsecureConnection(),
+                        authData.getTlsTrustStoreStream(), 
authData.getTlsCertificates(),
+                        authData.getTlsPrivateKey(),
+                        conf.getTlsCiphers(),
+                        conf.getTlsProtocols());
+            } else {
+                sslCtx = SecurityUtility.createNettySslContextForClient(
+                        sslProvider,
+                        conf.isTlsAllowInsecureConnection(),
+                        conf.getTlsTrustCertsFilePath(),
+                        conf.getTlsCertificateFilePath(),
+                        conf.getTlsKeyFilePath(),
+                        conf.getTlsCiphers(),
+                        conf.getTlsProtocols());
+            }
+            confBuilder.setSslContext(sslCtx);
+            if (!conf.isTlsHostnameVerificationEnable()) {
+                confBuilder.setSslEngineFactory(new 
WithSNISslEngineFactory(serviceNameResolver
+                        .resolveHostUri().getHost()));
             }
-            
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
         }
-        httpClient = new DefaultAsyncHttpClient(confBuilder.build());
-        this.requestTimeout = requestTimeoutMs > 0 ? 
Duration.ofMillis(requestTimeoutMs) : null;
-        this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
     }
 
     @Override
@@ -202,9 +262,8 @@ public class AsyncHttpConnector implements Connector {
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
-            log.error(e.getMessage());
+            throw new ProcessingException(e.getCause());
         }
-        return null;
     }
 
     private URI replaceWithNew(InetSocketAddress address, URI uri) {
@@ -263,6 +322,8 @@ public class AsyncHttpConnector implements Connector {
         return resultFuture;
     }
 
+    // TODO: There are problems with this solution since AsyncHttpClient 
already contains logic to retry requests.
+    // This solution doesn't contain backoff handling.
     private <T> void retryOperation(
             final CompletableFuture<T> resultFuture,
             final Supplier<CompletableFuture<T>> operation,
@@ -274,9 +335,13 @@ public class AsyncHttpConnector implements Connector {
             operationFuture.whenComplete(
                     (t, throwable) -> {
                         if (throwable != null) {
+                            throwable = 
FutureUtil.unwrapCompletionException(throwable);
                             if (throwable instanceof CancellationException) {
                                 resultFuture.completeExceptionally(
                                         new RetryException("Operation future 
was cancelled.", throwable));
+                            } else if (throwable instanceof 
MaxRedirectException) {
+                                // don't retry on max redirect
+                                resultFuture.completeExceptionally(throwable);
                             } else {
                                 if (retries > 0) {
                                     if (log.isDebugEnabled()) {
@@ -316,7 +381,129 @@ public class AsyncHttpConnector implements Connector {
         }
     }
 
+    public static class MaxRedirectException extends Exception {
+        public MaxRedirectException(String msg) {
+            super(msg, null, true, false);
+        }
+    }
+
     protected CompletableFuture<Response> oneShot(InetSocketAddress host, 
ClientRequest request) {
+        Request preparedRequest;
+        try {
+            preparedRequest = prepareRequest(host, request);
+        } catch (IOException e) {
+            return FutureUtil.failedFuture(e);
+        }
+        return executeRequest(preparedRequest);
+    }
+
+    public CompletableFuture<Response> executeRequest(Request request) {
+        return executeRequest(request, () -> new AsyncCompletionHandlerBase());
+    }
+
+    public CompletableFuture<Response> executeRequest(Request request,
+                                                       
Supplier<AsyncHandler<Response>> handlerSupplier) {
+        return executeRequest(request, handlerSupplier, 0);
+    }
+
+    private CompletableFuture<Response> executeRequest(Request request,
+                                                       
Supplier<AsyncHandler<Response>> handlerSupplier,
+                                                       int redirectCount) {
+        int maxRedirects = httpClient.getConfig().getMaxRedirects();
+        if (redirectCount > maxRedirects) {
+            return FutureUtil.failedFuture(
+                    new MaxRedirectException("Maximum redirect reached: " + 
maxRedirects + " uri:" + request.getUri()));
+        }
+        CompletableFuture<Response> responseFuture;
+        if (httpClient.getConfig().getMaxConnectionsPerHost() > 0) {
+            String hostAndPort = request.getUri().getHost() + ":" + 
request.getUri().getPort();
+            ConcurrencyReducer<Response> responseConcurrencyReducer = 
concurrencyReducers.computeIfAbsent(hostAndPort,
+                    h -> 
ConcurrencyReducer.create(httpClient.getConfig().getMaxConnectionsPerHost(),
+                            DEFAULT_MAX_QUEUE_SIZE_PER_HOST));
+            responseFuture = responseConcurrencyReducer.add(() -> 
doExecuteRequest(request, handlerSupplier));
+        } else {
+            responseFuture = doExecuteRequest(request, handlerSupplier);
+        }
+        CompletableFuture<Response> futureWithRedirect = 
responseFuture.thenCompose(response -> {
+            if (isRedirectStatusCode(response.getStatusCode())) {
+                return executeRedirect(request, response, handlerSupplier, 
redirectCount);
+            }
+            return CompletableFuture.completedFuture(response);
+        });
+        futureWithRedirect.whenComplete((response, throwable) -> {
+            // propagate cancellation or timeout to the original response 
future
+            responseFuture.cancel(false);
+        });
+        return futureWithRedirect;
+    }
+
+    private CompletableFuture<Response> executeRedirect(Request request, 
Response response,
+                                                        
Supplier<AsyncHandler<Response>> handlerSupplier,
+                                                        int redirectCount) {
+        String originalMethod = request.getMethod();
+        int statusCode = response.getStatusCode();
+        boolean switchToGet = !originalMethod.equals(GET)
+                && !originalMethod.equals(OPTIONS) && 
!originalMethod.equals(HEAD) && (
+                statusCode == MOVED_PERMANENTLY_301 || statusCode == 
SEE_OTHER_303 || statusCode == FOUND_302);
+        boolean keepBody = statusCode == TEMPORARY_REDIRECT_307 || statusCode 
== PERMANENT_REDIRECT_308;
+        String location = response.getHeader(HttpHeaders.LOCATION);
+        Uri newUri = Uri.create(request.getUri(), location);
+        BoundRequestBuilder builder = httpClient.prepareRequest(request);
+        if (switchToGet) {
+            builder.setMethod(GET);
+        }
+        builder.setUri(newUri);
+        if (keepBody) {
+            builder.setCharset(request.getCharset());
+            if (isNonEmpty(request.getFormParams())) {
+                builder.setFormParams(request.getFormParams());
+            } else if (request.getStringData() != null) {
+                builder.setBody(request.getStringData());
+            } else if (request.getByteData() != null){
+                builder.setBody(request.getByteData());
+            } else if (request.getByteBufferData() != null) {
+                builder.setBody(request.getByteBufferData());
+            } else if (request.getBodyGenerator() != null) {
+                builder.setBody(request.getBodyGenerator());
+            } else if (isNonEmpty(request.getBodyParts())) {
+                builder.setBodyParts(request.getBodyParts());
+            }
+        } else {
+            builder.resetFormParams();
+            builder.resetNonMultipartData();
+            builder.resetMultipartData();
+            io.netty.handler.codec.http.HttpHeaders headers = new 
DefaultHttpHeaders();
+            headers.add(request.getHeaders());
+            headers.remove(HttpHeaders.CONTENT_LENGTH);
+            headers.remove(HttpHeaders.CONTENT_TYPE);
+            headers.remove(HttpHeaders.CONTENT_ENCODING);
+            builder.setHeaders(headers);
+        }
+        return executeRequest(builder.build(), handlerSupplier, redirectCount 
+ 1);
+    }
+
+    private static boolean isRedirectStatusCode(int statusCode) {
+        return statusCode == MOVED_PERMANENTLY_301 || statusCode == FOUND_302 
|| statusCode == SEE_OTHER_303
+                || statusCode == TEMPORARY_REDIRECT_307 || statusCode == 
PERMANENT_REDIRECT_308;
+    }
+
+    private CompletableFuture<Response> doExecuteRequest(Request request,
+                                                         
Supplier<AsyncHandler<Response>> handlerSupplier) {
+        ListenableFuture<Response> responseFuture =
+                httpClient.executeRequest(request, handlerSupplier.get());
+        CompletableFuture<Response> completableFuture = 
responseFuture.toCompletableFuture();
+        completableFuture.whenComplete((response, throwable) -> {
+            throwable = FutureUtil.unwrapCompletionException(throwable);
+            if (throwable != null && (throwable instanceof 
CancellationException
+                    || throwable instanceof TimeoutException)) {
+                // abort the request if the future is cancelled or timed out
+                responseFuture.abort(throwable);
+            }
+        });
+        return completableFuture;
+    }
+
+    private Request prepareRequest(InetSocketAddress host, ClientRequest 
request) throws IOException {
         ClientRequest currentRequest = new ClientRequest(request);
         URI newUri = replaceWithNew(host, currentRequest.getUri());
         currentRequest.setUri(newUri);
@@ -327,14 +514,7 @@ public class AsyncHttpConnector implements Connector {
         if (currentRequest.hasEntity()) {
             ByteArrayOutputStream outStream = new ByteArrayOutputStream();
             currentRequest.setStreamProvider(contentLength -> outStream);
-            try {
-                currentRequest.writeEntity();
-            } catch (IOException e) {
-                CompletableFuture<Response> r = new CompletableFuture<>();
-                r.completeExceptionally(e);
-                return r;
-            }
-
+            currentRequest.writeEntity();
             builder.setBody(outStream.toByteArray());
         }
 
@@ -344,16 +524,7 @@ public class AsyncHttpConnector implements Connector {
             }
         });
 
-        ListenableFuture<Response> responseFuture = builder.execute();
-        CompletableFuture<Response> completableFuture = 
responseFuture.toCompletableFuture();
-        completableFuture.whenComplete((response, throwable) -> {
-            if (throwable != null && (throwable instanceof 
CancellationException
-                    || throwable instanceof TimeoutException)) {
-                // abort the request if the future is cancelled or timed out
-                responseFuture.abort(throwable);
-            }
-        });
-        return completableFuture;
+        return builder.build();
     }
 
     @Override
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java
new file mode 100644
index 00000000000..25810b8b1cf
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal.http;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.asynchttpclient.AsyncHandler;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
+
+/**
+ * Interface for executing HTTP requests asynchronously.
+ * This is used internally in the Pulsar Admin client for executing HTTP 
requests that by-pass the Jersey client
+ * and use the AsyncHttpClient API directly.
+ */
+public interface AsyncHttpRequestExecutor {
+    /**
+     * Execute the given HTTP request asynchronously.
+     *
+     * @param request the HTTP request to execute
+     * @return a future that will be completed with the HTTP response
+     */
+    CompletableFuture<Response> executeRequest(Request request);
+    /**
+     * Execute the given HTTP request asynchronously.
+     *
+     * @param request the HTTP request to execute
+     * @param handlerSupplier a supplier for the async handler to use for the 
request
+     * @return a future that will be completed with the HTTP response
+     */
+    CompletableFuture<Response> executeRequest(Request request, 
Supplier<AsyncHandler<Response>> handlerSupplier);
+}
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index 9aac263d54e..7042f46694c 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -65,6 +65,7 @@ public class PulsarAdminBuilderImplTest {
         config.put("autoCertRefreshSeconds", 20);
         config.put("connectionTimeoutMs", 30);
         config.put("readTimeoutMs", 40);
+        config.put("maxConnectionsPerHost", 50);
         PulsarAdminBuilder adminBuilder = 
PulsarAdmin.builder().loadConf(config);
         PulsarAdminImpl admin = (PulsarAdminImpl) adminBuilder.build();
         ClientConfigurationData clientConfigData = admin.getClientConfigData();
@@ -72,6 +73,7 @@ public class PulsarAdminBuilderImplTest {
         Assert.assertEquals(clientConfigData.getAutoCertRefreshSeconds(), 20);
         Assert.assertEquals(clientConfigData.getConnectionTimeoutMs(), 30);
         Assert.assertEquals(clientConfigData.getReadTimeoutMs(), 40);
+        Assert.assertEquals(clientConfigData.getConnectionsPerBroker(), 50);
     }
 
     @Test
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
index aee3ad48cde..e2676a996bb 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java
@@ -20,23 +20,34 @@ package org.apache.pulsar.client.admin.internal.http;
 
 import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
 import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.common.FileSource;
 import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
 import com.github.tomakehurst.wiremock.stubbing.Scenario;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.Response;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientRequest;
@@ -52,10 +63,74 @@ import org.testng.annotations.Test;
 
 public class AsyncHttpConnectorTest {
     WireMockServer server;
+    ConcurrencyTestTransformer concurrencyTestTransformer = new 
ConcurrencyTestTransformer();
+
+    private static class CopyRequestBodyToResponseBodyTransformer extends 
ResponseTransformer {
+        @Override
+        public com.github.tomakehurst.wiremock.http.Response transform(
+                com.github.tomakehurst.wiremock.http.Request request,
+                com.github.tomakehurst.wiremock.http.Response response, 
FileSource fileSource, Parameters parameters) {
+            return 
com.github.tomakehurst.wiremock.http.Response.Builder.like(response)
+                    .body(request.getBodyAsString())
+                    .build();
+        }
+
+        @Override
+        public String getName() {
+            return "copy-body";
+        }
+
+        @Override
+        public boolean applyGlobally() {
+            return false;
+        }
+    }
+
+    private static class ConcurrencyTestTransformer extends 
ResponseTransformer {
+        private static final long DELAY_MS = 100;
+        private final AtomicInteger concurrencyCounter = new AtomicInteger(0);
+        private final AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+        @Override
+        public com.github.tomakehurst.wiremock.http.Response transform(
+                com.github.tomakehurst.wiremock.http.Request request,
+                com.github.tomakehurst.wiremock.http.Response response, 
FileSource fileSource, Parameters parameters) {
+            int currentCounter = concurrencyCounter.incrementAndGet();
+            maxConcurrency.updateAndGet(v -> Math.max(v, currentCounter));
+            try {
+                try {
+                    Thread.sleep(DELAY_MS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                return 
com.github.tomakehurst.wiremock.http.Response.Builder.like(response)
+                        .body(String.valueOf(currentCounter))
+                        .build();
+            } finally {
+                concurrencyCounter.decrementAndGet();
+            }
+        }
+
+        public int getMaxConcurrency() {
+            return maxConcurrency.get();
+        }
+
+        @Override
+        public String getName() {
+            return "concurrency-test";
+        }
+
+        @Override
+        public boolean applyGlobally() {
+            return false;
+        }
+    }
 
     @BeforeClass(alwaysRun = true)
     void beforeClass() throws IOException {
         server = new WireMockServer(WireMockConfiguration.wireMockConfig()
+                .extensions(new CopyRequestBodyToResponseBodyTransformer(), 
concurrencyTestTransformer)
+                .containerThreads(100)
                 .port(0));
         server.start();
     }
@@ -137,4 +212,129 @@ public class AsyncHttpConnectorTest {
         assertEquals(scenarioState, "next");
         assertTrue(future.isCompletedExceptionally());
     }
+
+    @Test
+    void testMaxRedirects() {
+        // Redirect to itself to test max redirects
+        server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
+                .willReturn(aResponse()
+                        .withStatus(301)
+                        .withHeader("Location", "http://localhost:"; + 
server.port() + "/admin/v2/clusters")));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://localhost:"; + server.port());
+
+        @Cleanup
+        AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                5000, 0, conf);
+
+        Request request = new RequestBuilder("GET")
+                .setUrl("http://localhost:"; + server.port() + 
"/admin/v2/clusters")
+                .build();
+
+        try {
+            connector.executeRequest(request).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
AsyncHttpConnector.MaxRedirectException);
+        } catch (InterruptedException e) {
+            fail();
+        }
+    }
+
+    @Test
+    void testRelativeRedirect() throws ExecutionException, 
InterruptedException {
+        doTestRedirect("path2");
+    }
+
+    @Test
+    void testAbsoluteRedirect() throws ExecutionException, 
InterruptedException {
+        doTestRedirect("/path2");
+    }
+
+    @Test
+    void testUrlRedirect() throws ExecutionException, InterruptedException {
+        doTestRedirect("http://localhost:"; + server.port() + "/path2");
+    }
+
+    private void doTestRedirect(String location) throws InterruptedException, 
ExecutionException {
+        server.stubFor(get(urlEqualTo("/path1"))
+                .willReturn(aResponse()
+                        .withStatus(301)
+                        .withHeader("Location", location)));
+
+        server.stubFor(get(urlEqualTo("/path2"))
+                .willReturn(aResponse()
+                        .withBody("OK")));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://localhost:"; + server.port());
+
+        @Cleanup
+        AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                5000, 0, conf);
+
+        Request request = new RequestBuilder("GET")
+                .setUrl("http://localhost:"; + server.port() + "/path1")
+                .build();
+
+        Response response = connector.executeRequest(request).get();
+        assertEquals(response.getResponseBody(), "OK");
+    }
+
+    @Test
+    void testRedirectWithBody() throws ExecutionException, 
InterruptedException {
+        server.stubFor(post(urlEqualTo("/path1"))
+                .willReturn(aResponse()
+                        .withStatus(307)
+                        .withHeader("Location", "/path2")));
+
+        server.stubFor(post(urlEqualTo("/path2"))
+                .willReturn(aResponse()
+                        .withTransformers("copy-body")));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("http://localhost:"; + server.port());
+
+        @Cleanup
+        AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                5000, 0, conf);
+
+        Request request = new RequestBuilder("POST")
+                .setUrl("http://localhost:"; + server.port() + "/path1")
+                .setBody("Hello world!")
+                .build();
+
+        Response response = connector.executeRequest(request).get();
+        assertEquals(response.getResponseBody(), "Hello world!");
+    }
+
+    @Test
+    void testMaxConnections() throws ExecutionException, InterruptedException {
+        server.stubFor(post(urlEqualTo("/concurrency-test"))
+                .willReturn(aResponse()
+                        .withTransformers("concurrency-test")));
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        int maxConnections = 10;
+        conf.setConnectionsPerBroker(maxConnections);
+        conf.setServiceUrl("http://localhost:"; + server.port());
+
+        @Cleanup
+        AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
+                5000, 0, conf);
+
+        Request request = new RequestBuilder("POST")
+                .setUrl("http://localhost:"; + server.port() + 
"/concurrency-test")
+                .build();
+
+        List<CompletableFuture<Response>> futures = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            futures.add(connector.executeRequest(request));
+        }
+        FutureUtil.waitForAll(futures).get();
+        int maxConcurrency = concurrencyTestTransformer.getMaxConcurrency();
+        assertTrue(maxConcurrency > maxConnections / 2 && maxConcurrency <= 
maxConnections,
+                "concurrency didn't get limited as expected (max: " + 
maxConcurrency + ")");
+    }
 }
\ No newline at end of file
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 330b28e38b8..666754e7bb0 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -165,6 +165,7 @@
                   <include>com.google.errorprone:*</include>
                   <include>com.google.j2objc:*</include>
                   <include>com.google.code.gson:gson</include>
+                  <include>com.spotify:completable-futures</include>
                   <include>com.fasterxml.jackson.core</include>
                   <include>com.fasterxml.jackson.module</include>
                   <include>com.fasterxml.jackson.core:jackson-core</include>
@@ -240,6 +241,10 @@
                     <exclude>com.google.protobuf.*</exclude>
                   </excludes>
                 </relocation>
+                <relocation>
+                  <pattern>com.spotify.futures</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>com.fasterxml.jackson</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 6d44f20b8ee..4635958eb5a 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -128,6 +128,8 @@ public interface ClientBuilder extends Serializable, 
Cloneable {
 
     /**
      * Release the connection if it is not used for more than {@param 
connectionMaxIdleSeconds} seconds.
+     * Defaults to 25 seconds.
+     *
      * @return the client builder instance
      */
     ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 3405f705797..f69713b692f 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -145,6 +145,7 @@
                   <include>com.google.errorprone:*</include>
                   <include>com.google.j2objc:*</include>
                   <include>com.google.code.gson:gson</include>
+                  <include>com.spotify:completable-futures</include>
                   <include>com.fasterxml.jackson.core</include>
                   <include>com.fasterxml.jackson.module</include>
                   <include>com.fasterxml.jackson.core:jackson-core</include>
@@ -203,6 +204,10 @@
                     <exclude>com.google.protobuf.*</exclude>
                   </excludes>
                 </relocation>
+                <relocation>
+                  <pattern>com.spotify.futures</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.com.spotify.futures</shadedPattern>
+                </relocation>
                 <relocation>
                   <pattern>com.fasterxml.jackson</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 0b4b7e9a3b9..f675e684d67 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
 
 public class ConnectionPool implements AutoCloseable {
 
-    public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;
+    public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 15;
 
     protected final ConcurrentHashMap<InetSocketAddress, 
ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 9c2b4b8d58a..3c5d889e0c7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.conf;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
@@ -45,6 +46,7 @@ import org.apache.pulsar.client.util.Secret;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ClientConfigurationData implements Serializable, Cloneable {
     private static final long serialVersionUID = 1L;
 
@@ -129,7 +131,7 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
             value = "Release the connection if it is not used for more than 
[connectionMaxIdleSeconds] seconds. "
                     + "If  [connectionMaxIdleSeconds] < 0, disabled the 
feature that auto release the idle connections"
     )
-    private int connectionMaxIdleSeconds = 180;
+    private int connectionMaxIdleSeconds = 25;
 
     @ApiModelProperty(
             name = "useTcpNoDelay",
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
index ca2efe1f0ad..c71a6d45c2a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java
@@ -109,7 +109,7 @@ public class ClientBuilderImplTest {
         PulsarClient.builder().connectionMaxIdleSeconds(60);
         // test config not correct.
         try {
-            PulsarClient.builder().connectionMaxIdleSeconds(30);
+            PulsarClient.builder().connectionMaxIdleSeconds(14);
             Assert.fail();
         } catch (IllegalArgumentException e){
         }
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 1428463d844..a0f43a81aec 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -163,6 +163,11 @@
       <optional>true</optional>
     </dependency>
 
+    <dependency>
+      <groupId>com.spotify</groupId>
+      <artifactId>completable-futures</artifactId>
+    </dependency>
+
     <!-- test -->
     <dependency>
       <groupId>org.bouncycastle</groupId>
diff --git a/pulsar-sql/presto-distribution/LICENSE 
b/pulsar-sql/presto-distribution/LICENSE
index 405ca53f551..71688402a20 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -491,6 +491,8 @@ The Apache Software License, Version 2.0
     - auto-service-annotations-1.0.jar
   * AMQP
     - amqp-client-5.5.3.jar
+  * Spotify completable-futures
+    - completable-futures-0.3.6.jar
 
 Protocol Buffers License
  * Protocol Buffers
@@ -544,15 +546,15 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
    - aopalliance-repackaged-2.6.1.jar
  * Jersey
     - jaxrs-0.199.jar
-    - jersey-client-2.41.jar
-    - jersey-common-2.41.jar
-    - jersey-container-servlet-2.41.jar
-    - jersey-container-servlet-core-2.41.jar
-    - jersey-entity-filtering-2.41.jar
-    - jersey-hk2-2.41.jar
-    - jersey-media-json-jackson-2.41.jar
-    - jersey-media-multipart-2.41.jar
-    - jersey-server-2.41.jar
+    - jersey-client-2.42.jar
+    - jersey-common-2.42.jar
+    - jersey-container-servlet-2.42.jar
+    - jersey-container-servlet-core-2.42.jar
+    - jersey-entity-filtering-2.42.jar
+    - jersey-hk2-2.42.jar
+    - jersey-media-json-jackson-2.42.jar
+    - jersey-media-multipart-2.42.jar
+    - jersey-server-2.42.jar
  * JAXB
     - jaxb-api-2.3.1.jar
  * RXJava
diff --git a/pulsar-sql/presto-distribution/pom.xml 
b/pulsar-sql/presto-distribution/pom.xml
index 6f63c9a5585..e0915145894 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -33,7 +33,7 @@
 
   <properties>
     <skipBuildDistribution>false</skipBuildDistribution>
-    <jersey.version>2.41</jersey.version>
+    <jersey.version>2.42</jersey.version>
     <objenesis.version>2.6</objenesis.version>
     <objectsize.version>0.0.12</objectsize.version>
     <maven.version>3.0.5</maven.version>

Reply via email to