This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbf00bcf45d KAFKA-16093: Fix spurious REST-related warnings on Connect 
startup (#15149)
dbf00bcf45d is described below

commit dbf00bcf45d30f2c9567585f15121fe46b030d4c
Author: Chris Egerton <chr...@aiven.io>
AuthorDate: Wed Jan 10 09:03:23 2024 -0500

    KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)
    
    Reviewers: Sagar Rao <sagarmeansoc...@gmail.com>, Greg Harris 
<greg.har...@aiven.io>
---
 checkstyle/import-control.xml                      |   5 +
 .../connect/mirror/rest/MirrorRestServer.java      |  23 ++++-
 .../rest/resources/InternalMirrorResource.java     |  11 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  10 +-
 .../connect/runtime/rest/ConnectRestServer.java    |  31 ++++--
 .../connect/runtime/rest/HerderRequestHandler.java |  15 +--
 ...onnectResource.java => RestRequestTimeout.java} |  25 ++---
 .../kafka/connect/runtime/rest/RestServer.java     | 111 ++++++++++++++-------
 .../runtime/rest/resources/ConnectResource.java    |  40 --------
 .../rest/resources/ConnectorPluginsResource.java   |  19 ++--
 .../runtime/rest/resources/ConnectorsResource.java |  21 ++--
 .../rest/resources/InternalClusterResource.java    |  12 +--
 .../rest/resources/InternalConnectResource.java    |   7 +-
 .../runtime/rest/resources/LoggingResource.java    |  10 +-
 .../runtime/rest/resources/RootResource.java       |  10 +-
 .../connect/integration/BlockingConnectorTest.java |   4 +-
 .../integration/ConnectWorkerIntegrationTest.java  |   2 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |   8 +-
 .../runtime/rest/ConnectRestServerTest.java        |  35 ++++---
 .../resources/ConnectorPluginsResourceTest.java    |   5 +-
 .../rest/resources/ConnectorsResourceTest.java     |  16 +--
 .../resources/InternalConnectResourceTest.java     |   3 +-
 22 files changed, 219 insertions(+), 204 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 39a77326bde..b43840f5979 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -516,6 +516,9 @@
       <allow pkg="kafka.server" />
       <subpackage name="rest">
         <allow pkg="javax.ws.rs" />
+        <allow pkg="javax.inject" />
+        <allow pkg="org.glassfish.jersey" />
+        <allow pkg="org.glassfish.hk2" />
       </subpackage>
     </subpackage>
 
@@ -530,6 +533,8 @@
       <subpackage name="rest">
         <allow pkg="org.eclipse.jetty" />
         <allow pkg="javax.ws.rs" />
+        <allow pkg="javax.inject" />
+        <allow pkg="org.glassfish.hk2" />
         <allow pkg="javax.servlet" />
         <allow pkg="org.glassfish.jersey" />
         <allow pkg="com.fasterxml.jackson" />
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
index 7f1fe2841a3..a5abeff40ce 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
@@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+import org.glassfish.hk2.api.TypeLiteral;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer {
     }
 
     @Override
-    protected Collection<ConnectResource> regularResources() {
+    protected Collection<Class<?>> regularResources() {
         return Arrays.asList(
-                new InternalMirrorResource(herders, restClient)
+                InternalMirrorResource.class
         );
     }
 
     @Override
-    protected Collection<ConnectResource> adminResources() {
+    protected Collection<Class<?>> adminResources() {
         return Collections.emptyList();
     }
 
+    @Override
+    protected void configureRegularResources(ResourceConfig resourceConfig) {
+        resourceConfig.register(new Binder());
+    }
+
+    private class Binder extends AbstractBinder {
+        @Override
+        protected void configure() {
+            bind(herders).to(new TypeLiteral<Map<SourceAndTarget, Herder>>() { 
});
+            bind(restClient).to(RestClient.class);
+        }
+    }
+
 }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
index 8b5150f56ac..5c46bd9c6c5 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources;
 import org.apache.kafka.connect.mirror.SourceAndTarget;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
 import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Context;
@@ -39,8 +41,13 @@ public class InternalMirrorResource extends 
InternalClusterResource {
 
     private final Map<SourceAndTarget, Herder> herders;
 
-    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, 
RestClient restClient) {
-        super(restClient);
+    @Inject
+    public InternalMirrorResource(
+            Map<SourceAndTarget, Herder> herders,
+            RestClient restClient,
+            RestRequestTimeout requestTimeout
+    ) {
+        super(restClient, requestTimeout);
         this.herders = herders;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3a586ea6186..3024e6b3d03 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -66,10 +66,10 @@ import 
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.Message;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -718,7 +718,7 @@ public class Worker {
                             .map(this::taskTransactionalId)
                             .collect(Collectors.toList());
                     FenceProducersOptions fencingOptions = new 
FenceProducersOptions()
-                            .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                            .timeoutMs((int) 
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
                     return admin.fenceProducers(transactionalIds, 
fencingOptions).all().whenComplete((ignored, error) -> {
                         if (error == null)
                             log.debug("Finished fencing out {} task producers 
for source connector {}", numTasks, connName);
@@ -1195,7 +1195,7 @@ public class Worker {
         Admin admin = adminFactory.apply(adminConfig);
         try {
             ListConsumerGroupOffsetsOptions listOffsetsOptions = new 
ListConsumerGroupOffsetsOptions()
-                    .timeoutMs((int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+                    .timeoutMs((int) 
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
             ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
             
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result,
 error) -> {
                 if (error != null) {
@@ -1299,7 +1299,7 @@ public class Worker {
                                     Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
         executor.submit(plugins.withClassLoader(connectorLoader, () -> {
             try {
-                Timer timer = 
time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
+                Timer timer = 
time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
                 boolean isReset = offsets == null;
                 SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
                 Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
@@ -1530,7 +1530,7 @@ public class Worker {
                                       ClassLoader connectorLoader, 
Callback<Message> cb) {
         executor.submit(plugins.withClassLoader(connectorLoader, () -> {
             try {
-                Timer timer = 
time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
+                Timer timer = 
time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
                 // This reads to the end of the offsets topic and can be a 
potentially time-consuming operation
                 offsetStore.start();
                 updateTimerAndCheckExpiry(timer, "Timed out while trying to 
read to the end of the offsets topic prior to modifying " +
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
index 6cef19c22b8..3adbc0f14ec 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import 
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
 import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
 import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 
 import java.util.Arrays;
@@ -45,25 +45,40 @@ public class ConnectRestServer extends RestServer {
     }
 
     @Override
-    protected Collection<ConnectResource> regularResources() {
+    protected Collection<Class<?>> regularResources() {
         return Arrays.asList(
-                new RootResource(herder),
-                new ConnectorsResource(herder, config, restClient),
-                new InternalConnectResource(herder, restClient),
-                new ConnectorPluginsResource(herder)
+                RootResource.class,
+                ConnectorsResource.class,
+                InternalConnectResource.class,
+                ConnectorPluginsResource.class
         );
     }
 
     @Override
-    protected Collection<ConnectResource> adminResources() {
+    protected Collection<Class<?>> adminResources() {
         return Arrays.asList(
-                new LoggingResource(herder)
+                LoggingResource.class
         );
     }
 
     @Override
     protected void configureRegularResources(ResourceConfig resourceConfig) {
         registerRestExtensions(herder, resourceConfig);
+        resourceConfig.register(new Binder());
+    }
+
+    private class Binder extends AbstractBinder {
+        @Override
+        protected void configure() {
+            bind(herder).to(Herder.class);
+            bind(restClient).to(RestClient.class);
+            bind(config).to(RestServerConfig.class);
+        }
+    }
+
+    @Override
+    protected void configureAdminResources(ResourceConfig resourceConfig) {
+        resourceConfig.register(new Binder());
     }
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
index 18c969098b7..bd57dc2c80a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
@@ -41,18 +41,11 @@ public class HerderRequestHandler {
 
     private final RestClient restClient;
 
-    private volatile long requestTimeoutMs;
+    private final RestRequestTimeout requestTimeout;
 
-    public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
+    public HerderRequestHandler(RestClient restClient, RestRequestTimeout 
requestTimeout) {
         this.restClient = restClient;
-        this.requestTimeoutMs = requestTimeoutMs;
-    }
-
-    public void requestTimeoutMs(long requestTimeoutMs) {
-        if (requestTimeoutMs < 1) {
-            throw new IllegalArgumentException("REST request timeout must be 
positive");
-        }
-        this.requestTimeoutMs = requestTimeoutMs;
+        this.requestTimeout = requestTimeout;
     }
 
     /**
@@ -64,7 +57,7 @@ public class HerderRequestHandler {
      */
     public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
         try {
-            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+            return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw e.getCause();
         } catch (StagedTimeoutException e) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
similarity index 59%
copy from 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
copy to 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
index c4987150dfb..d2ce28cc472 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
@@ -14,26 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.runtime.rest.resources;
+package org.apache.kafka.connect.runtime.rest;
 
-import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.rest.RestClient;
+public interface RestRequestTimeout {
 
-import javax.ws.rs.Path;
-
-@Path("/connectors")
-public class InternalConnectResource extends InternalClusterResource {
-
-    private final Herder herder;
-
-    public InternalConnectResource(Herder herder, RestClient restClient) {
-        super(restClient);
-        this.herder = herder;
-    }
-
-    @Override
-    protected Herder herderForRequest() {
-        return herder;
-    }
+    /**
+     * @return the current timeout that should be used for REST requests, in 
milliseconds
+     */
+    long timeoutMs();
 
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 9a5bc6f5a58..f078b24420e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.CustomRequestLog;
@@ -43,6 +42,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.CrossOriginFilter;
 import org.eclipse.jetty.servlets.HeaderFilter;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.glassfish.hk2.utilities.Binder;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.servlet.ServletContainer;
@@ -59,6 +60,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -66,6 +68,13 @@ import java.util.regex.Pattern;
  * Embedded server for the REST API that provides the control plane for Kafka 
Connect workers.
  */
 public abstract class RestServer {
+
+    // TODO: This should not be so long. However, due to potentially long 
rebalances that may have to wait a full
+    // session timeout to complete, during which we cannot serve some 
requests. Ideally we could reduce this, but
+    // we need to consider all possible scenarios this could fail. It might be 
ok to fail with a timeout in rare cases,
+    // but currently a worker simply leaving the group can take this long as 
well.
+    public static final long DEFAULT_REST_REQUEST_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(90);
+
     private static final Logger log = 
LoggerFactory.getLogger(RestServer.class);
 
     // Used to distinguish between Admin connectors and regular REST API 
connectors when binding admin handlers
@@ -80,8 +89,8 @@ public abstract class RestServer {
     protected final RestServerConfig config;
     private final ContextHandlerCollection handlers;
     private final Server jettyServer;
+    private final RequestTimeout requestTimeout;
 
-    private Collection<ConnectResource> resources;
     private List<ConnectRestExtension> connectRestExtensions = 
Collections.emptyList();
 
     /**
@@ -95,6 +104,7 @@ public abstract class RestServer {
 
         jettyServer = new Server();
         handlers = new ContextHandlerCollection();
+        requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
 
         createConnectors(listeners, adminListeners);
     }
@@ -207,44 +217,31 @@ public abstract class RestServer {
 
     protected final void initializeResources() {
         log.info("Initializing REST resources");
-        resources = new ArrayList<>();
-
-        ResourceConfig resourceConfig = new ResourceConfig();
-        resourceConfig.register(new JacksonJsonProvider());
 
-        Collection<ConnectResource> regularResources = regularResources();
+        ResourceConfig resourceConfig = newResourceConfig();
+        Collection<Class<?>> regularResources = regularResources();
         regularResources.forEach(resourceConfig::register);
-        resources.addAll(regularResources);
-
-        resourceConfig.register(ConnectExceptionMapper.class);
-        resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
-
         configureRegularResources(resourceConfig);
 
         List<String> adminListeners = config.adminListeners();
         ResourceConfig adminResourceConfig;
-        if (adminListeners == null) {
-            log.info("Adding admin resources to main listener");
-            adminResourceConfig = resourceConfig;
-            Collection<ConnectResource> adminResources = adminResources();
-            resources.addAll(adminResources);
-            adminResources.forEach(adminResourceConfig::register);
-            configureAdminResources(adminResourceConfig);
-        } else if (adminListeners.size() > 0) {
-            // TODO: we need to check if these listeners are same as 
'listeners'
-            // TODO: the following code assumes that they are different
-            log.info("Adding admin resources to admin listener");
-            adminResourceConfig = new ResourceConfig();
-            adminResourceConfig.register(new JacksonJsonProvider());
-            Collection<ConnectResource> adminResources = adminResources();
-            resources.addAll(adminResources);
-            adminResources.forEach(adminResourceConfig::register);
-            adminResourceConfig.register(ConnectExceptionMapper.class);
-            configureAdminResources(adminResourceConfig);
-        } else {
+        if (adminListeners != null && adminListeners.isEmpty()) {
             log.info("Skipping adding admin resources");
             // set up adminResource but add no handlers to it
             adminResourceConfig = resourceConfig;
+        } else {
+            if (adminListeners == null) {
+                log.info("Adding admin resources to main listener");
+                adminResourceConfig = resourceConfig;
+            } else {
+                // TODO: we need to check if these listeners are same as 
'listeners'
+                // TODO: the following code assumes that they are different
+                log.info("Adding admin resources to admin listener");
+                adminResourceConfig = newResourceConfig();
+            }
+            Collection<Class<?>> adminResources = adminResources();
+            adminResources.forEach(adminResourceConfig::register);
+            configureAdminResources(adminResourceConfig);
         }
 
         ServletContainer servletContainer = new 
ServletContainer(resourceConfig);
@@ -302,17 +299,26 @@ public abstract class RestServer {
         log.info("REST resources initialized; server is started and ready to 
handle requests");
     }
 
+    private ResourceConfig newResourceConfig() {
+        ResourceConfig result = new ResourceConfig();
+        result.register(new JacksonJsonProvider());
+        result.register(requestTimeout.binder());
+        result.register(ConnectExceptionMapper.class);
+        result.property(ServerProperties.WADL_FEATURE_DISABLE, true);
+        return result;
+    }
+
     /**
-     * @return the {@link ConnectResource resources} that should be registered 
with the
+     * @return the resources that should be registered with the
      * standard (i.e., non-admin) listener for this server; may be empty, but 
not null
      */
-    protected abstract Collection<ConnectResource> regularResources();
+    protected abstract Collection<Class<?>> regularResources();
 
     /**
-     * @return the {@link ConnectResource resources} that should be registered 
with the
+     * @return the resources that should be registered with the
      * admin listener for this server; may be empty, but not null
      */
-    protected abstract Collection<ConnectResource> adminResources();
+    protected abstract Collection<Class<?>> adminResources();
 
     /**
      * Pluggable hook to customize the regular (i.e., non-admin) resources on 
this server
@@ -438,7 +444,7 @@ public abstract class RestServer {
 
     // For testing only
     public void requestTimeout(long requestTimeoutMs) {
-        this.resources.forEach(resource -> 
resource.requestTimeout(requestTimeoutMs));
+        this.requestTimeout.timeoutMs(requestTimeoutMs);
     }
 
     String determineAdvertisedProtocol() {
@@ -488,7 +494,7 @@ public abstract class RestServer {
             config.restExtensions(),
             config, ConnectRestExtension.class);
 
-        long herderRequestTimeoutMs = 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+        long herderRequestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs();
 
@@ -520,4 +526,35 @@ public abstract class RestServer {
         headerFilterHolder.setInitParameter("headerConfig", headerConfig);
         context.addFilter(headerFilterHolder, "/*", 
EnumSet.of(DispatcherType.REQUEST));
     }
+
+    private static class RequestTimeout implements RestRequestTimeout {
+
+        private final RequestBinder binder;
+        private volatile long timeoutMs;
+
+        public RequestTimeout(long initialTimeoutMs) {
+            this.timeoutMs = initialTimeoutMs;
+            this.binder = new RequestBinder();
+        }
+
+        @Override
+        public long timeoutMs() {
+            return timeoutMs;
+        }
+
+        public void timeoutMs(long timeoutMs) {
+            this.timeoutMs = timeoutMs;
+        }
+
+        public Binder binder() {
+            return binder;
+        }
+
+        private class RequestBinder extends AbstractBinder {
+            @Override
+            protected void configure() {
+                bind(RequestTimeout.this).to(RestRequestTimeout.class);
+            }
+        }
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
deleted file mode 100644
index 49d61a727a9..00000000000
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime.rest.resources;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * This interface defines shared logic for all Connect REST resources.
- */
-public interface ConnectResource {
-
-    // TODO: This should not be so long. However, due to potentially long 
rebalances that may have to wait a full
-    // session timeout to complete, during which we cannot serve some 
requests. Ideally we could reduce this, but
-    // we need to consider all possible scenarios this could fail. It might be 
ok to fail with a timeout in rare cases,
-    // but currently a worker simply leaving the group can take this long as 
well.
-    long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
-
-    /**
-     * Set how long the resource will await the completion of each request 
before returning a 500 error.
-     * If the resource does not perform any operations that can be expected to 
block under reasonable
-     * circumstances, this can be implemented as a no-op.
-     * @param requestTimeoutMs the new timeout in milliseconds; must be 
positive
-     */
-    void requestTimeout(long requestTimeoutMs);
-
-}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 3c6cce98a05..7537a6fe1a7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
@@ -30,6 +31,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.apache.kafka.connect.util.Stage;
 import org.apache.kafka.connect.util.StagedTimeoutException;
 
+import javax.inject.Inject;
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
@@ -56,17 +58,18 @@ import java.util.stream.Collectors;
 @Path("/connector-plugins")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorPluginsResource implements ConnectResource {
+public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final Set<PluginInfo> connectorPlugins;
-    private long requestTimeoutMs;
+    private final RestRequestTimeout requestTimeout;
 
-    public ConnectorPluginsResource(Herder herder) {
+    @Inject
+    public ConnectorPluginsResource(Herder herder, RestRequestTimeout 
requestTimeout) {
         this.herder = herder;
+        this.requestTimeout = requestTimeout;
         this.connectorPlugins = new LinkedHashSet<>();
-        this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         // TODO: improve once plugins are allowed to be added/removed during 
runtime.
         addConnectorPlugins(herder.plugins().sinkConnectors());
@@ -83,11 +86,6 @@ public class ConnectorPluginsResource implements 
ConnectResource {
                 .forEach(connectorPlugins::add);
     }
 
-    @Override
-    public void requestTimeout(long requestTimeoutMs) {
-        this.requestTimeoutMs = requestTimeoutMs;
-    }
-
     @PUT
     @Path("/{pluginName}/config/validate")
     @Operation(summary = "Validate the provided configuration against the 
configuration definition for the specified pluginName")
@@ -109,7 +107,7 @@ public class ConnectorPluginsResource implements 
ConnectResource {
         herder.validateConnectorConfig(connectorConfig, validationCallback, 
false);
 
         try {
-            return validationCallback.get(requestTimeoutMs, 
TimeUnit.MILLISECONDS);
+            return validationCallback.get(requestTimeout.timeoutMs(), 
TimeUnit.MILLISECONDS);
         } catch (StagedTimeoutException e) {
             Stage stage = e.stage();
             String message;
@@ -136,7 +134,6 @@ public class ConnectorPluginsResource implements 
ConnectResource {
     }
 
     @GET
-    @Path("/")
     @Operation(summary = "List all connector plugins installed")
     public List<PluginInfo> listConnectorPlugins(
             @DefaultValue("true") @QueryParam("connectorsOnly") 
@Parameter(description = "Whether to list only connectors instead of all 
plugins") boolean connectorsOnly
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 75e510ef9ad..3b3a969a97d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -39,6 +40,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import javax.servlet.ServletContext;
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
@@ -70,7 +72,7 @@ import static 
org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Transla
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorsResource implements ConnectResource {
+public class ConnectorsResource {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectorsResource.class);
 
     private final Herder herder;
@@ -80,20 +82,20 @@ public class ConnectorsResource implements ConnectResource {
     private final boolean isTopicTrackingDisabled;
     private final boolean isTopicTrackingResetDisabled;
 
-    public ConnectorsResource(Herder herder, RestServerConfig config, 
RestClient restClient) {
+    @Inject
+    public ConnectorsResource(
+            Herder herder,
+            RestServerConfig config,
+            RestClient restClient,
+            RestRequestTimeout requestTimeout
+    ) {
         this.herder = herder;
-        this.requestHandler = new HerderRequestHandler(restClient, 
DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        this.requestHandler = new HerderRequestHandler(restClient, 
requestTimeout);
         this.isTopicTrackingDisabled = !config.topicTrackingEnabled();
         this.isTopicTrackingResetDisabled = 
!config.topicTrackingResetEnabled();
     }
 
-    @Override
-    public void requestTimeout(long requestTimeoutMs) {
-        requestHandler.requestTimeoutMs(requestTimeoutMs);
-    }
-
     @GET
-    @Path("/")
     @Operation(summary = "List all active connectors")
     public Response listConnectors(
         final @Context UriInfo uriInfo,
@@ -131,7 +133,6 @@ public class ConnectorsResource implements ConnectResource {
     }
 
     @POST
-    @Path("/")
     @Operation(summary = "Create a new connector")
     public Response createConnector(final @Parameter(hidden = true) 
@QueryParam("forward") Boolean forward,
                                     final @Context HttpHeaders headers,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
index c7bef991b41..5ee1d232a29 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.distributed.Crypto;
 import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import javax.ws.rs.POST;
@@ -45,7 +46,7 @@ import java.util.Map;
  * requests that originate from a user and are forwarded from one worker to 
another.
  */
 @Produces(MediaType.APPLICATION_JSON)
-public abstract class InternalClusterResource implements ConnectResource {
+public abstract class InternalClusterResource {
 
     private static final TypeReference<List<Map<String, String>>> 
TASK_CONFIGS_TYPE =
             new TypeReference<List<Map<String, String>>>() { };
@@ -56,13 +57,8 @@ public abstract class InternalClusterResource implements 
ConnectResource {
     @Context
     UriInfo uriInfo;
 
-    protected InternalClusterResource(RestClient restClient) {
-        this.requestHandler = new HerderRequestHandler(restClient, 
DEFAULT_REST_REQUEST_TIMEOUT_MS);
-    }
-
-    @Override
-    public void requestTimeout(long requestTimeoutMs) {
-        requestHandler.requestTimeoutMs(requestTimeoutMs);
+    protected InternalClusterResource(RestClient restClient, 
RestRequestTimeout requestTimeout) {
+        this.requestHandler = new HerderRequestHandler(restClient, 
requestTimeout);
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
index c4987150dfb..228c7cd67ba 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
@@ -18,7 +18,9 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
 
+import javax.inject.Inject;
 import javax.ws.rs.Path;
 
 @Path("/connectors")
@@ -26,8 +28,9 @@ public class InternalConnectResource extends 
InternalClusterResource {
 
     private final Herder herder;
 
-    public InternalConnectResource(Herder herder, RestClient restClient) {
-        super(restClient);
+    @Inject
+    public InternalConnectResource(Herder herder, RestClient restClient, 
RestRequestTimeout requestTimeout) {
+        super(restClient, requestTimeout);
         this.herder = herder;
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index b215fe72adb..bbf5bfa0d55 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.log4j.Level;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -46,7 +47,7 @@ import java.util.Objects;
 @Path("/admin/loggers")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class LoggingResource implements ConnectResource {
+public class LoggingResource {
 
     private static final org.slf4j.Logger log = 
LoggerFactory.getLogger(LoggingResource.class);
 
@@ -55,22 +56,17 @@ public class LoggingResource implements ConnectResource {
 
     private final Herder herder;
 
+    @Inject
     public LoggingResource(Herder herder) {
         this.herder = herder;
     }
 
-    @Override
-    public void requestTimeout(long requestTimeoutMs) {
-        // No-op
-    }
-
     /**
      * List the current loggers that have their levels explicitly set and 
their log levels.
      *
      * @return a list of current loggers and their levels.
      */
     @GET
-    @Path("/")
     @Operation(summary = "List the current loggers that have their levels 
explicitly set and their log levels")
     public Response listLoggers() {
         return Response.ok(herder.allLoggerLevels()).build();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index fe09e269039..b112c59b820 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.Operation;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
+import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -27,21 +28,16 @@ import javax.ws.rs.core.MediaType;
 
 @Path("/")
 @Produces(MediaType.APPLICATION_JSON)
-public class RootResource implements ConnectResource {
+public class RootResource {
 
     private final Herder herder;
 
+    @Inject
     public RootResource(Herder herder) {
         this.herder = herder;
     }
 
-    @Override
-    public void requestTimeout(long requestTimeoutMs) {
-        // No-op
-    }
-
     @GET
-    @Path("/")
     @Operation(summary = "Get details about this Connect worker and the id of 
the Kafka cluster it is connected to")
     public ServerInfo serverInfo() {
         return new ServerInfo(herder.kafkaClusterId());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 03ace3b5706..6655e5a01ca 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -63,6 +62,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -379,7 +379,7 @@ public class BlockingConnectorTest {
             );
         }
         // Reset the REST request timeout so that other requests aren't 
impacted
-        
connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
     }
 
     private static class Block {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 6e49120343f..ec4d256c6e6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -63,7 +63,7 @@ import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POL
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
-import static 
org.apache.kafka.connect.runtime.rest.resources.ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+import static 
org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
 import static 
org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.containsString;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 6a34fad6e93..a03378b1380 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -61,10 +61,10 @@ import 
org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.Message;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -2417,7 +2417,7 @@ public class WorkerTest {
         // Expect the call to Admin::deleteConsumerGroups to have a timeout 
value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS
         // minus the delay introduced in the call to 
Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call 
to
         // SinkConnector::alterOffsets (3000 ms)
-        assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 
2000L - 3000L,
+        assertEquals((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L 
- 3000L,
                 
deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue());
         verify(admin, timeout(1000)).close();
         verifyKafkaClusterId();
@@ -2469,7 +2469,7 @@ public class WorkerTest {
 
         when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
         when(sourceConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenAnswer(invocation -> {
-            time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
+            time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
             return true;
         });
         ConnectorOffsetBackingStore offsetStore = 
mock(ConnectorOffsetBackingStore.class);
@@ -2507,7 +2507,7 @@ public class WorkerTest {
         when(plugins.withClassLoader(any(ClassLoader.class), 
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
 
         when(sinkConnector.alterOffsets(eq(connectorProps), 
anyMap())).thenAnswer(invocation -> {
-            time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
+            time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
             return true;
         });
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index f2978678bbf..1e06a776453 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -40,6 +40,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.slf4j.LoggerFactory;
 
@@ -62,13 +63,13 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ConnectRestServerTest {
 
-    private Herder herder;
-    private Plugins plugins;
+    @Mock private RestClient restClient;
+    @Mock private Herder herder;
+    @Mock private Plugins plugins;
     private ConnectRestServer server;
     private CloseableHttpClient httpClient;
     private Collection<CloseableHttpResponse> responses = new ArrayList<>();
@@ -77,8 +78,6 @@ public class ConnectRestServerTest {
 
     @Before
     public void setUp() {
-        herder = mock(Herder.class);
-        plugins = mock(Plugins.class);
         httpClient = HttpClients.createMinimal();
     }
 
@@ -117,7 +116,7 @@ public class ConnectRestServerTest {
         Map<String, String> configMap = new HashMap<>(baseServerProps());
         configMap.put(RestServerConfig.LISTENERS_CONFIG, 
"http://localhost:8080,https://localhost:8443";);
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         Assert.assertEquals("http://localhost:8080/";, 
server.advertisedUrl().toString());
         server.stop();
 
@@ -126,7 +125,7 @@ public class ConnectRestServerTest {
         configMap.put(RestServerConfig.LISTENERS_CONFIG, 
"http://localhost:8080,https://localhost:8443";);
         configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, 
"https");
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         Assert.assertEquals("https://localhost:8443/";, 
server.advertisedUrl().toString());
         server.stop();
 
@@ -134,7 +133,7 @@ public class ConnectRestServerTest {
         configMap = new HashMap<>(baseServerProps());
         configMap.put(RestServerConfig.LISTENERS_CONFIG, 
"https://localhost:8443";);
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         Assert.assertEquals("https://localhost:8443/";, 
server.advertisedUrl().toString());
         server.stop();
 
@@ -145,7 +144,7 @@ public class ConnectRestServerTest {
         configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, 
"somehost");
         configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         Assert.assertEquals("http://somehost:10000/";, 
server.advertisedUrl().toString());
         server.stop();
 
@@ -154,7 +153,7 @@ public class ConnectRestServerTest {
         configMap.put(RestServerConfig.LISTENERS_CONFIG, 
"https://encrypted-localhost:42069,http://plaintext-localhost:4761";);
         configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, 
"http");
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         Assert.assertEquals("http://plaintext-localhost:4761/";, 
server.advertisedUrl().toString());
         server.stop();
     }
@@ -167,7 +166,7 @@ public class ConnectRestServerTest {
         doReturn(plugins).when(herder).plugins();
         expectEmptyRestExtensions();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -194,7 +193,7 @@ public class ConnectRestServerTest {
         expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         URI serverUrl = server.advertisedUrl();
@@ -237,7 +236,7 @@ public class ConnectRestServerTest {
         expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
@@ -260,7 +259,7 @@ public class ConnectRestServerTest {
         
doReturn(Collections.emptyList()).when(herder).setWorkerLoggerLevel(logger, 
loggingLevel);
         doReturn(Collections.singletonMap(logger, new 
LoggerLevel(loggingLevel, lastModified))).when(herder).allLoggerLevels();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -295,7 +294,7 @@ public class ConnectRestServerTest {
         LoggerFactory.getLogger("a.b.c.p.Y");
         LoggerFactory.getLogger("a.b.c.p.Z");
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -317,7 +316,7 @@ public class ConnectRestServerTest {
         doReturn(plugins).when(herder).plugins();
         expectEmptyRestExtensions();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -336,7 +335,7 @@ public class ConnectRestServerTest {
         doReturn(plugins).when(herder).plugins();
         expectEmptyRestExtensions();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -382,7 +381,7 @@ public class ConnectRestServerTest {
         expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new ConnectRestServer(null, null, configMap);
+        server = new ConnectRestServer(null, restClient, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 52ac14ca1cd..589dfb29bd0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -39,6 +39,8 @@ import 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
+import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -208,7 +210,8 @@ public class ConnectorPluginsResourceTest {
         doReturn(HEADER_CONVERTER_PLUGINS).when(plugins).headerConverters();
         doReturn(TRANSFORMATION_PLUGINS).when(plugins).transformations();
         doReturn(PREDICATE_PLUGINS).when(plugins).predicates();
-        connectorPluginsResource = new ConnectorPluginsResource(herder);
+        RestRequestTimeout requestTimeout = () -> 
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+        connectorPluginsResource = new ConnectorPluginsResource(herder, 
requestTimeout);
     }
 
     @Test
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index aed081cf4d6..3395690b7ad 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -28,6 +28,8 @@ import 
org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
+import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -146,6 +148,8 @@ public class ConnectorsResourceTest {
     private static final Set<String> CONNECTOR2_ACTIVE_TOPICS = new HashSet<>(
             Arrays.asList("foo_topic", "baz_topic"));
 
+    private static final RestRequestTimeout REQUEST_TIMEOUT = () -> 
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+
     @Mock
     private Herder herder;
     private ConnectorsResource connectorsResource;
@@ -159,7 +163,7 @@ public class ConnectorsResourceTest {
     public void setUp() throws NoSuchMethodException {
         when(serverConfig.topicTrackingEnabled()).thenReturn(true);
         when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
         forward = mock(UriInfo.class);
         MultivaluedMap<String, String> queryParams = new 
MultivaluedHashMap<>();
         queryParams.putSingle("forward", "true");
@@ -742,7 +746,7 @@ public class ConnectorsResourceTest {
     public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
         when(serverConfig.topicTrackingEnabled()).thenReturn(false);
         when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@@ -754,7 +758,7 @@ public class ConnectorsResourceTest {
         when(serverConfig.topicTrackingEnabled()).thenReturn(false);
         when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> 
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -766,7 +770,7 @@ public class ConnectorsResourceTest {
         when(serverConfig.topicTrackingEnabled()).thenReturn(true);
         when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> 
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -779,7 +783,7 @@ public class ConnectorsResourceTest {
         when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
         when(herder.connectorActiveTopics(CONNECTOR_NAME))
             .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, 
CONNECTOR_ACTIVE_TOPICS));
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
 
         Response response = 
connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
         assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@@ -792,7 +796,7 @@ public class ConnectorsResourceTest {
     @Test
     public void testResetConnectorActiveTopics() {
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, 
restClient, REQUEST_TIMEOUT);
 
         Response response = 
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
         verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
index 5bf33bf5300..0dff57fb593 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.util.Callback;
 import org.junit.Before;
 import org.junit.Test;
@@ -74,7 +75,7 @@ public class InternalConnectResourceTest {
 
     @Before
     public void setup() {
-        internalResource = new InternalConnectResource(herder, restClient);
+        internalResource = new InternalConnectResource(herder, restClient, () 
-> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
         internalResource.uriInfo = uriInfo;
     }
 

Reply via email to