This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new ec07ab91c0a KAFKA-16058: close controllerApi instance to avoid thread 
leaks (#15084)
ec07ab91c0a is described below

commit ec07ab91c0a5c3471ba702f4a3d5add189f1a660
Author: Luke Chen <[email protected]>
AuthorDate: Fri Dec 29 00:38:20 2023 +0900

    KAFKA-16058: close controllerApi instance to avoid thread leaks (#15084)
    
    The controllerApi will create some resources, including the reaper threads. 
In ControllerApisTest, we created it on many test cases, but didn't close it. 
This commit doesn't change anything in the business logic of the test, it just 
adds try/finally to close the controllerApi instance.
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../unit/kafka/server/ControllerApisTest.scala     | 1259 +++++++++++---------
 1 file changed, 719 insertions(+), 540 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 024f79d537c..66059238e5c 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -206,9 +206,15 @@ class ControllerApisTest {
 
   @Test
   def testUnauthorizedFetch(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 
12))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = createControllerApis(
+        Some(createDenyAllAuthorizer()), new MockController.Builder().build())
+      try {
+        controllerApis.handleFetch(buildRequest(new FetchRequest(new 
FetchRequestData(), 12)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
@@ -223,14 +229,18 @@ class ControllerApisTest {
       new CompletableFuture[ApiMessage]()
     )
 
-    createControllerApis(None, new MockController.Builder().build())
-      .handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12)))
+    val controllerApis = createControllerApis(None, new 
MockController.Builder().build())
+    try {
+      controllerApis.handleFetch(buildRequest(new FetchRequest(new 
FetchRequestData(), 12)))
 
-    verify(raftManager).handleRequest(
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any()
-    )
+      verify(raftManager).handleRequest(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+      )
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -253,26 +263,36 @@ class ControllerApisTest {
     // Local time should be updated when `ControllerApis.handle` returns
     val fetchRequestData = new FetchRequestData()
     val request = buildRequest(new FetchRequest(fetchRequestData, 
ApiKeys.FETCH.latestVersion))
-    createControllerApis(None, new MockController.Builder().build())
-      .handle(request, RequestLocal.NoCaching)
+    val controllerApis = createControllerApis(None, new 
MockController.Builder().build())
+    try {
+      controllerApis.handle(request, RequestLocal.NoCaching)
 
-    verify(raftManager).handleRequest(
-      ArgumentMatchers.eq(request.header),
-      ArgumentMatchers.eq(fetchRequestData),
-      ArgumentMatchers.eq(initialTimeMs)
-    )
 
-    assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
-      request.apiLocalCompleteTimeNanos - initialTimeNanos,
-      TimeUnit.NANOSECONDS
-    ))
+      verify(raftManager).handleRequest(
+        ArgumentMatchers.eq(request.header),
+        ArgumentMatchers.eq(fetchRequestData),
+        ArgumentMatchers.eq(initialTimeMs)
+      )
+
+      assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
+        request.apiLocalCompleteTimeNanos - initialTimeNanos,
+        TimeUnit.NANOSECONDS
+      ))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedFetchSnapshot(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new 
FetchSnapshotRequestData(), 0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleFetchSnapshot(buildRequest(new 
FetchSnapshotRequest(new FetchSnapshotRequestData(), 0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
@@ -287,21 +307,32 @@ class ControllerApisTest {
       new CompletableFuture[ApiMessage]()
     )
 
-    createControllerApis(None, new MockController.Builder().build())
-      .handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new 
FetchSnapshotRequestData(), 0)))
+    val controllerApis = createControllerApis(None, new 
MockController.Builder().build())
 
-    verify(raftManager).handleRequest(
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any()
-    )
+    try {
+      controllerApis.handleFetchSnapshot(buildRequest(new 
FetchSnapshotRequest(new FetchSnapshotRequestData(), 0)))
+
+      verify(raftManager).handleRequest(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+      )
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedVote(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleVote(buildRequest(new VoteRequest.Builder(new 
VoteRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = createControllerApis(
+        Some(createDenyAllAuthorizer()), new MockController.Builder().build())
+      try {
+        controllerApis.handleVote(buildRequest(new VoteRequest.Builder(new 
VoteRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
@@ -334,81 +365,115 @@ class ControllerApisTest {
             setValue("bar")).iterator())),
         ).iterator()))
     val request = buildRequest(new AlterConfigsRequest(requestData, 0))
-    createControllerApis(Some(createDenyAllAuthorizer()),
-      new MockController.Builder().build()).handleLegacyAlterConfigs(request)
-    val capturedResponse: ArgumentCaptor[AbstractResponse] =
-      ArgumentCaptor.forClass(classOf[AbstractResponse])
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None))
-    assertNotNull(capturedResponse.getValue)
-    val response = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
-    assertEquals(Set(
-      new OldAlterConfigsResourceResponse().
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate resource.").
-        setResourceName("2").
-        setResourceType(ConfigResource.Type.BROKER.id()),
-      new OldAlterConfigsResourceResponse().
-        setErrorCode(UNSUPPORTED_VERSION.code()).
-        setErrorMessage("Unknown resource type 123.").
-        setResourceName("baz").
-        setResourceType(123.toByte),
-      new OldAlterConfigsResourceResponse().
-        setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
-        setErrorMessage("Cluster authorization failed.").
-        setResourceName("1").
-        setResourceType(ConfigResource.Type.BROKER.id())),
-      response.data().responses().asScala.toSet)
+    val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()), 
new MockController.Builder().build())
+    try {
+      controllerApis.handleLegacyAlterConfigs(request)
+      val capturedResponse: ArgumentCaptor[AbstractResponse] =
+        ArgumentCaptor.forClass(classOf[AbstractResponse])
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None))
+      assertNotNull(capturedResponse.getValue)
+      val response = 
capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
+      assertEquals(Set(
+        new OldAlterConfigsResourceResponse().
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate resource.").
+          setResourceName("2").
+          setResourceType(ConfigResource.Type.BROKER.id()),
+        new OldAlterConfigsResourceResponse().
+          setErrorCode(UNSUPPORTED_VERSION.code()).
+          setErrorMessage("Unknown resource type 123.").
+          setResourceName("baz").
+          setResourceType(123.toByte),
+        new OldAlterConfigsResourceResponse().
+          setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
+          setErrorMessage("Cluster authorization failed.").
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER.id())),
+        response.data().responses().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedBeginQuorumEpoch(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleBeginQuorumEpoch(buildRequest(new 
BeginQuorumEpochRequest.Builder(
-          new BeginQuorumEpochRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleBeginQuorumEpoch(buildRequest(new 
BeginQuorumEpochRequest.Builder(
+          new BeginQuorumEpochRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedEndQuorumEpoch(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleEndQuorumEpoch(buildRequest(new EndQuorumEpochRequest.Builder(
-          new EndQuorumEpochRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleEndQuorumEpoch(buildRequest(new 
EndQuorumEpochRequest.Builder(
+          new EndQuorumEpochRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedDescribeQuorum(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleDescribeQuorum(buildRequest(new DescribeQuorumRequest.Builder(
-          new DescribeQuorumRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleDescribeQuorum(buildRequest(new 
DescribeQuorumRequest.Builder(
+          new DescribeQuorumRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedHandleAlterPartitionRequest(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleAlterPartitionRequest(buildRequest(new 
AlterPartitionRequest.Builder(
-          new AlterPartitionRequestData(), false).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleAlterPartitionRequest(buildRequest(new 
AlterPartitionRequest.Builder(
+          new AlterPartitionRequestData(), false).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedHandleBrokerHeartBeatRequest(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleBrokerHeartBeatRequest(buildRequest(new 
BrokerHeartbeatRequest.Builder(
-          new BrokerHeartbeatRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleBrokerHeartBeatRequest(buildRequest(new 
BrokerHeartbeatRequest.Builder(
+          new BrokerHeartbeatRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedHandleUnregisterBroker(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleUnregisterBroker(buildRequest(new 
UnregisterBrokerRequest.Builder(
-          new UnregisterBrokerRequestData()).build(0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleUnregisterBroker(buildRequest(new 
UnregisterBrokerRequest.Builder(
+          new UnregisterBrokerRequestData()).build(0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
@@ -429,26 +494,35 @@ class ControllerApisTest {
     val request = buildRequest(brokerRegistrationRequest)
     val capturedResponse: ArgumentCaptor[AbstractResponse] = 
ArgumentCaptor.forClass(classOf[AbstractResponse])
 
-    createControllerApis(Some(createDenyAllAuthorizer()), 
mock(classOf[Controller])).handle(request,
-      RequestLocal.withThreadConfinedCaching)
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None))
-
-    assertNotNull(capturedResponse.getValue)
-
-    val brokerRegistrationResponse = 
capturedResponse.getValue.asInstanceOf[BrokerRegistrationResponse]
-    assertEquals(Map(CLUSTER_AUTHORIZATION_FAILED -> 1),
-      brokerRegistrationResponse.errorCounts().asScala)
+    val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()), 
mock(classOf[Controller]))
+    try {
+      controllerApis.handle(request, RequestLocal.withThreadConfinedCaching)
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None))
+
+      assertNotNull(capturedResponse.getValue)
+
+      val brokerRegistrationResponse = 
capturedResponse.getValue.asInstanceOf[BrokerRegistrationResponse]
+      assertEquals(Map(CLUSTER_AUTHORIZATION_FAILED -> 1),
+        brokerRegistrationResponse.errorCounts().asScala)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedHandleAlterClientQuotas(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
-          new AlterClientQuotasRequestData(), 0))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleAlterClientQuotas(buildRequest(new 
AlterClientQuotasRequest(
+          new AlterClientQuotasRequestData(), 0)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
@@ -478,32 +552,37 @@ class ControllerApisTest {
             setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
         ).iterator()))
     val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData).build(0))
-    createControllerApis(Some(createDenyAllAuthorizer()),
-      new 
MockController.Builder().build()).handleIncrementalAlterConfigs(request)
-    val capturedResponse: ArgumentCaptor[AbstractResponse] =
-      ArgumentCaptor.forClass(classOf[AbstractResponse])
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None))
-    assertNotNull(capturedResponse.getValue)
-    val response = 
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
-    assertEquals(Set(new AlterConfigsResourceResponse().
+    val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()),
+      new MockController.Builder().build())
+    try {
+      controllerApis.handleIncrementalAlterConfigs(request)
+      val capturedResponse: ArgumentCaptor[AbstractResponse] =
+        ArgumentCaptor.forClass(classOf[AbstractResponse])
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None))
+      assertNotNull(capturedResponse.getValue)
+      val response = 
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
+      assertEquals(Set(new AlterConfigsResourceResponse().
         setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
         setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
         setResourceName("1").
         setResourceType(ConfigResource.Type.BROKER.id()),
-      new AlterConfigsResourceResponse().
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
-        setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message()).
-        setResourceName("foo").
-        setResourceType(ConfigResource.Type.TOPIC.id()),
-      new AlterConfigsResourceResponse().
-        setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
-        setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
-        setResourceName("sub").
-        setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
-      response.data().responses().asScala.toSet)
+        new AlterConfigsResourceResponse().
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+          setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message()).
+          setResourceName("foo").
+          setResourceType(ConfigResource.Type.TOPIC.id()),
+        new AlterConfigsResourceResponse().
+          setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
+          setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
+          setResourceName("sub").
+          setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
+        response.data().responses().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @ParameterizedTest
@@ -567,106 +646,129 @@ class ControllerApisTest {
     } else {
       None
     }
-    createControllerApis(authorizer,
-      new 
MockController.Builder().build()).handleIncrementalAlterConfigs(request)
-    val capturedResponse: ArgumentCaptor[AbstractResponse] =
-      ArgumentCaptor.forClass(classOf[AbstractResponse])
-    verify(requestChannel).sendResponse(
-      ArgumentMatchers.eq(request),
-      capturedResponse.capture(),
-      ArgumentMatchers.eq(None))
-    assertNotNull(capturedResponse.getValue)
-    val response = 
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
-    assertEquals(Set(
-      new AlterConfigsResourceResponse().
-        setErrorCode(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
-        setErrorMessage(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.message() else null).
-        setResourceName("1").
-        setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
-      new AlterConfigsResourceResponse().
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate resource.").
-        setResourceName("3").
-        setResourceType(ConfigResource.Type.BROKER.id()),
-      new AlterConfigsResourceResponse().
-        setErrorCode(UNSUPPORTED_VERSION.code()).
-        setErrorMessage("Unknown resource type 124.").
-        setResourceName("foo").
-        setResourceType(124.toByte),
-      new AlterConfigsResourceResponse().
-        setErrorCode(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
-        setErrorMessage(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.message() else null).
-        setResourceName("sub").
-        setResourceType(ConfigResource.Type.CLIENT_METRICS.id()),
-      new AlterConfigsResourceResponse().
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate resource.").
-        setResourceName("sub1").
-        setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
-      response.data().responses().asScala.toSet)
+    val controllerApis = createControllerApis(authorizer, new 
MockController.Builder().build())
+    try {
+      controllerApis.handleIncrementalAlterConfigs(request)
+      val capturedResponse: ArgumentCaptor[AbstractResponse] =
+        ArgumentCaptor.forClass(classOf[AbstractResponse])
+      verify(requestChannel).sendResponse(
+        ArgumentMatchers.eq(request),
+        capturedResponse.capture(),
+        ArgumentMatchers.eq(None))
+      assertNotNull(capturedResponse.getValue)
+      val response = 
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
+      assertEquals(Set(
+        new AlterConfigsResourceResponse().
+          setErrorCode(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
+          setErrorMessage(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.message() else null).
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
+        new AlterConfigsResourceResponse().
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate resource.").
+          setResourceName("3").
+          setResourceType(ConfigResource.Type.BROKER.id()),
+        new AlterConfigsResourceResponse().
+          setErrorCode(UNSUPPORTED_VERSION.code()).
+          setErrorMessage("Unknown resource type 124.").
+          setResourceName("foo").
+          setResourceType(124.toByte),
+        new AlterConfigsResourceResponse().
+          setErrorCode(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
+          setErrorMessage(if (denyAllAuthorizer) 
CLUSTER_AUTHORIZATION_FAILED.message() else null).
+          setResourceName("sub").
+          setResourceType(ConfigResource.Type.CLIENT_METRICS.id()),
+        new AlterConfigsResourceResponse().
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate resource.").
+          setResourceName("sub1").
+          setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
+        response.data().responses().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedHandleAlterPartitionReassignments(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleAlterPartitionReassignments(buildRequest(new 
AlterPartitionReassignmentsRequest.Builder(
-            new AlterPartitionReassignmentsRequestData()).build())))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleAlterPartitionReassignments(buildRequest(new 
AlterPartitionReassignmentsRequest.Builder(
+          new AlterPartitionReassignmentsRequestData()).build()))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedHandleAllocateProducerIds(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-      handleAllocateProducerIdsRequest(buildRequest(new 
AllocateProducerIdsRequest.Builder(
-        new AllocateProducerIdsRequestData()).build())))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleAllocateProducerIdsRequest(buildRequest(new 
AllocateProducerIdsRequest.Builder(
+          new AllocateProducerIdsRequestData()).build()))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedHandleListPartitionReassignments(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-      handleListPartitionReassignments(buildRequest(new 
ListPartitionReassignmentsRequest.Builder(
-        new ListPartitionReassignmentsRequestData()).build())))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleListPartitionReassignments(buildRequest(new 
ListPartitionReassignmentsRequest.Builder(
+          new ListPartitionReassignmentsRequestData()).build()))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testCreateTopics(): Unit = {
     val controller = new MockController.Builder().build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new CreateTopicsRequestData().setTopics(new 
CreatableTopicCollection(
-      util.Arrays.asList(new 
CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(3),
-        new 
CreatableTopic().setName("foo").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3),
-        new 
CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3),
-    ).iterator()))
-    val expectedResponse = Set(new CreatableTopicResult().setName("foo").
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate topic name."),
-      new CreatableTopicResult().setName("bar").
+    try {
+      val request = new CreateTopicsRequestData().setTopics(new 
CreatableTopicCollection(
+        util.Arrays.asList(new 
CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(3),
+          new 
CreatableTopic().setName("foo").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3),
+          new 
CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3),
+        ).iterator()))
+      val expectedResponse = Set(new CreatableTopicResult().setName("foo").
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic name."),
-      new CreatableTopicResult().setName("baz").
-        setErrorCode(NONE.code()).
-        setTopicId(new Uuid(0L, 1L)).
-        setNumPartitions(2).
-        setReplicationFactor(3).
-        setTopicConfigErrorCode(NONE.code()),
-      new CreatableTopicResult().setName("indescribable").
-        setErrorCode(NONE.code()).
-        setTopicId(new Uuid(0L, 2L)).
-        setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
-      new CreatableTopicResult().setName("quux").
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
-        setErrorMessage("Authorization failed."))
-    assertEquals(expectedResponse, 
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
-      false,
-      _ => Set("baz", "indescribable"),
-      _ => Set("baz")).get().topics().asScala.toSet)
+        new CreatableTopicResult().setName("bar").
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate topic name."),
+        new CreatableTopicResult().setName("baz").
+          setErrorCode(NONE.code()).
+          setTopicId(new Uuid(0L, 1L)).
+          setNumPartitions(2).
+          setReplicationFactor(3).
+          setTopicConfigErrorCode(NONE.code()),
+        new CreatableTopicResult().setName("indescribable").
+          setErrorCode(NONE.code()).
+          setTopicId(new Uuid(0L, 2L)).
+          setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
+        new CreatableTopicResult().setName("quux").
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+          setErrorMessage("Authorization failed."))
+      assertEquals(expectedResponse, 
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
+        false,
+        _ => Set("baz", "indescribable"),
+        _ => Set("baz")).get().topics().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
@@ -674,26 +776,30 @@ class ControllerApisTest {
   def testCreateTopicsMutationQuota(throttle: Boolean): Unit = {
     val controller = new MockController.Builder().build()
     val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
-    val topicName = "foo"
-    val requestData = new CreateTopicsRequestData().setTopics(new 
CreatableTopicCollection(
-      util.Collections.singletonList(new 
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
-    val request = new CreateTopicsRequest.Builder(requestData).build()
-    val expectedResponseDataUnthrottled = Set(new 
CreatableTopicResult().setName(topicName).
+    try {
+      val topicName = "foo"
+      val requestData = new CreateTopicsRequestData().setTopics(new 
CreatableTopicCollection(
+        util.Collections.singletonList(new 
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
+      val request = new CreateTopicsRequest.Builder(requestData).build()
+      val expectedResponseDataUnthrottled = Set(new 
CreatableTopicResult().setName(topicName).
         setErrorCode(NONE.code()).
         setTopicId(new Uuid(0L, 1L)).
         setNumPartitions(1).
         setReplicationFactor(1).
         setTopicConfigErrorCode(NONE.code()))
-    val expectedResponseDataThrottled = Set(new 
CreatableTopicResult().setName(topicName).
-      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
-      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
-    val response = handleRequest[CreateTopicsResponse](request, controllerApis)
-    if (throttle) {
-      assertEquals(expectedResponseDataThrottled, 
response.data.topics().asScala.toSet)
-      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
-    } else {
-      assertEquals(expectedResponseDataUnthrottled, 
response.data.topics().asScala.toSet)
-      assertEquals(0, response.throttleTimeMs())
+      val expectedResponseDataThrottled = Set(new 
CreatableTopicResult().setName(topicName).
+        setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+        setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+      val response = handleRequest[CreateTopicsResponse](request, 
controllerApis)
+      if (throttle) {
+        assertEquals(expectedResponseDataThrottled, 
response.data.topics().asScala.toSet)
+        assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+      } else {
+        assertEquals(expectedResponseDataUnthrottled, 
response.data.topics().asScala.toSet)
+        assertEquals(0, response.throttleTimeMs())
+      }
+    } finally {
+      controllerApis.close()
     }
   }
 
@@ -702,20 +808,24 @@ class ControllerApisTest {
     val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
     val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData().setTopicNames(
-      util.Arrays.asList("foo", "bar", "quux", "quux"))
-    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+    try {
+      val request = new DeleteTopicsRequestData().setTopicNames(
+        util.Arrays.asList("foo", "bar", "quux", "quux"))
+      val expectedResponse = Set(new DeletableTopicResult().setName("quux").
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic name."),
-      new DeletableTopicResult().setName("bar").
-        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
-        setErrorMessage("This server does not host this topic-partition."),
-      new DeletableTopicResult().setName("foo").setTopicId(fooId))
-    assertEquals(expectedResponse, 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
-      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-      true,
-      _ => Set.empty,
-      _ => Set.empty).get().asScala.toSet)
+        new DeletableTopicResult().setName("bar").
+          setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+          setErrorMessage("This server does not host this topic-partition."),
+        new DeletableTopicResult().setName("foo").setTopicId(fooId))
+      assertEquals(expectedResponse, 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        true,
+        _ => Set.empty,
+        _ => Set.empty).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -725,23 +835,27 @@ class ControllerApisTest {
     val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
     val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
-    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(quuxId).
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+      val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(quuxId).
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic id."),
-      new DeletableTopicResult().setName(null).setTopicId(barId).
-        setErrorCode(UNKNOWN_TOPIC_ID.code()).
-        setErrorMessage("This server does not host this topic ID."),
-      new DeletableTopicResult().setName("foo").setTopicId(fooId))
-    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
-      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-      true,
-      _ => Set.empty,
-      _ => Set.empty).get().asScala.toSet)
+        new DeletableTopicResult().setName(null).setTopicId(barId).
+          setErrorCode(UNKNOWN_TOPIC_ID.code()).
+          setErrorMessage("This server does not host this topic ID."),
+        new DeletableTopicResult().setName("foo").setTopicId(fooId))
+      assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        true,
+        _ => Set.empty,
+        _ => Set.empty).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -753,37 +867,41 @@ class ControllerApisTest {
       newInitialTopic("foo", fooId).
       newInitialTopic("bar", barId).build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(fooId))
-    request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
-    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
-    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(fooId))
+      request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+      request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+      val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Neither topic name nor id were specified."),
-      new DeletableTopicResult().setName("foo").setTopicId(fooId).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("You may not specify both topic name and topic id."),
-      new DeletableTopicResult().setName("bar").setTopicId(barId).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("The provided topic name maps to an ID that was 
already supplied."),
-      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate topic name."),
-      new DeletableTopicResult().setName(null).setTopicId(bazId).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate topic id."))
-    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
-      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-      false,
-      names => names.toSet,
-      names => names.toSet).get().asScala.toSet)
+        new DeletableTopicResult().setName("foo").setTopicId(fooId).
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("You may not specify both topic name and topic id."),
+        new DeletableTopicResult().setName("bar").setTopicId(barId).
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("The provided topic name maps to an ID that was 
already supplied."),
+        new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate topic name."),
+        new DeletableTopicResult().setName(null).setTopicId(bazId).
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate topic id."))
+      assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        false,
+        names => names.toSet,
+        names => names.toSet).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -798,28 +916,32 @@ class ControllerApisTest {
       newInitialTopic("baz", bazId).
       newInitialTopic("quux", quuxId).build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
-    request.topics().add(new 
DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
-    val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(barId).
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+      request.topics().add(new 
DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+      val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(barId).
         setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
         setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
-      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
-        setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
-      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
-        setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
-      new DeletableTopicResult().setName("foo").setTopicId(fooId).
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
-        setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message))
-    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
-      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-      false,
-      _ => Set("foo", "baz"),
-      _ => Set.empty).get().asScala.toSet)
+        new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+          setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+        new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+          setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+        new DeletableTopicResult().setName("foo").setTopicId(fooId).
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+          setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message))
+      assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, 
request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        false,
+        _ => Set("foo", "baz"),
+        _ => Set.empty).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -827,24 +949,28 @@ class ControllerApisTest {
     val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
     val controller = new MockController.Builder().build()
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
-    val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+      val expectedResponse = Set(new DeletableTopicResult().setName("foo").
         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code).
         setErrorMessage(UNKNOWN_TOPIC_OR_PARTITION.message),
-      new DeletableTopicResult().setName("bar").
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
-        setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
-      new DeletableTopicResult().setName(null).setTopicId(barId).
-        setErrorCode(UNKNOWN_TOPIC_ID.code).
-        setErrorMessage(UNKNOWN_TOPIC_ID.message))
-    assertEquals(expectedResponse, 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
-      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-      false,
-      _ => Set("foo"),
-      _ => Set.empty).get().asScala.toSet)
+        new DeletableTopicResult().setName("bar").
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+          setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+        new DeletableTopicResult().setName(null).setTopicId(barId).
+          setErrorCode(UNKNOWN_TOPIC_ID.code).
+          setErrorMessage(UNKNOWN_TOPIC_ID.message))
+      assertEquals(expectedResponse, 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        false,
+        _ => Set("foo"),
+        _ => Set.empty).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -855,15 +981,19 @@ class ControllerApisTest {
       newInitialTopic("foo", fooId).build()
     controller.setActive(false)
     val controllerApis = createControllerApis(None, controller)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
-    request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
-    assertEquals(classOf[NotControllerException], assertThrows(
-      classOf[ExecutionException], () => 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
-        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-        false,
-        _ => Set("foo", "bar"),
-        _ => Set("foo", "bar")).get()).getCause.getClass)
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+      request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+      assertEquals(classOf[NotControllerException], assertThrows(
+        classOf[ExecutionException], () => 
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+          ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+          false,
+          _ => Set("foo", "bar"),
+          _ => Set("foo", "bar")).get()).getCause.getClass)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -874,20 +1004,24 @@ class ControllerApisTest {
     val props = new Properties()
     props.put(KafkaConfig.DeleteTopicEnableProp, "false")
     val controllerApis = createControllerApis(None, controller, props)
-    val request = new DeleteTopicsRequestData()
-    request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
-    assertThrows(classOf[TopicDeletionDisabledException],
-      () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
-        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
-        false,
-        _ => Set("foo", "bar"),
-        _ => Set("foo", "bar")))
-    assertThrows(classOf[InvalidRequestException],
-      () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
-        1,
-        false,
-        _ => Set("foo", "bar"),
-        _ => Set("foo", "bar")))
+    try {
+      val request = new DeleteTopicsRequestData()
+      request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+      assertThrows(classOf[TopicDeletionDisabledException],
+        () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+          ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+          false,
+          _ => Set("foo", "bar"),
+          _ => Set("foo", "bar")))
+      assertThrows(classOf[InvalidRequestException],
+        () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+          1,
+          false,
+          _ => Set("foo", "bar"),
+          _ => Set("foo", "bar")))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @ParameterizedTest
@@ -895,37 +1029,41 @@ class ControllerApisTest {
   def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
     val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
-    val request = new CreatePartitionsRequestData()
-    request.topics().add(new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
-    request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
-    request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
-    request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
-    request.topics().add(new 
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
-    request.setValidateOnly(validateOnly)
-
-    // Check if the controller is called correctly with the 'validateOnly' 
field set appropriately.
-    when(controller.createPartitions(
-      any(),
-      ArgumentMatchers.eq(
-        Collections.singletonList(
-          new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
-      ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
-      .completedFuture(Collections.singletonList(
-        new CreatePartitionsTopicResult().setName("foo").
-          setErrorCode(NONE.code()).
-          setErrorMessage(null)
-      )))
-    assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
-      setErrorCode(NONE.code()).
-      setErrorMessage(null),
-      new CreatePartitionsTopicResult().setName("bar").
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Duplicate topic name."),
-      new CreatePartitionsTopicResult().setName("baz").
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
-        setErrorMessage(null)),
-      controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
-        _ => Set("foo", "bar")).get().asScala.toSet)
+    try {
+      val request = new CreatePartitionsRequestData()
+      request.topics().add(new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
+      request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+      request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+      request.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+      request.topics().add(new 
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+      request.setValidateOnly(validateOnly)
+
+      // Check if the controller is called correctly with the 'validateOnly' 
field set appropriately.
+      when(controller.createPartitions(
+        any(),
+        ArgumentMatchers.eq(
+          Collections.singletonList(
+            new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+        ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+        .completedFuture(Collections.singletonList(
+          new CreatePartitionsTopicResult().setName("foo").
+            setErrorCode(NONE.code()).
+            setErrorMessage(null)
+        )))
+      assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
+        setErrorCode(NONE.code()).
+        setErrorMessage(null),
+        new CreatePartitionsTopicResult().setName("bar").
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Duplicate topic name."),
+        new CreatePartitionsTopicResult().setName("baz").
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+          setErrorMessage(null)),
+        controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
+          _ => Set("foo", "bar")).get().asScala.toSet)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -935,35 +1073,38 @@ class ControllerApisTest {
       .build()
     val authorizer = mock(classOf[Authorizer])
     val controllerApis = createControllerApis(Some(authorizer), controller)
-
-    val requestData = new CreatePartitionsRequestData()
-    requestData.topics().add(new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
-    requestData.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
-    val request = new CreatePartitionsRequest.Builder(requestData).build()
-
-    val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", 
PatternType.LITERAL)
-    val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
-
-    val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", 
PatternType.LITERAL)
-    val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
-
-    when(authorizer.authorize(
-      any[RequestContext],
-      any[util.List[Action]]
-    )).thenAnswer { invocation =>
-      val actions = invocation.getArgument[util.List[Action]](1).asScala
-      val results = actions.map { action =>
-        if (action == fooAction) AuthorizationResult.ALLOWED
-        else if (action == barAction) AuthorizationResult.DENIED
-        else throw new AssertionError(s"Unexpected action $action")
+    try {
+      val requestData = new CreatePartitionsRequestData()
+      requestData.topics().add(new 
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
+      requestData.topics().add(new 
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
+      val request = new CreatePartitionsRequest.Builder(requestData).build()
+
+      val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", 
PatternType.LITERAL)
+      val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, 
true)
+
+      val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", 
PatternType.LITERAL)
+      val barAction = new Action(AclOperation.ALTER, barResource, 1, true, 
true)
+
+      when(authorizer.authorize(
+        any[RequestContext],
+        any[util.List[Action]]
+      )).thenAnswer { invocation =>
+        val actions = invocation.getArgument[util.List[Action]](1).asScala
+        val results = actions.map { action =>
+          if (action == fooAction) AuthorizationResult.ALLOWED
+          else if (action == barAction) AuthorizationResult.DENIED
+          else throw new AssertionError(s"Unexpected action $action")
+        }
+        new util.ArrayList[AuthorizationResult](results.asJava)
       }
-      new util.ArrayList[AuthorizationResult](results.asJava)
-    }
 
-    val response = handleRequest[CreatePartitionsResponse](request, 
controllerApis)
-    val results = response.data.results.asScala
-    assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result 
=> Errors.forCode(result.errorCode)))
-    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name 
== "bar").map(result => Errors.forCode(result.errorCode)))
+      val response = handleRequest[CreatePartitionsResponse](request, 
controllerApis)
+      val results = response.data.results.asScala
+      assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result 
=> Errors.forCode(result.errorCode)))
+      assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), 
results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @ParameterizedTest(name = "testCreatePartitionsMutationQuota with throttle: 
{0}")
@@ -974,21 +1115,25 @@ class ControllerApisTest {
       .newInitialTopic(topicName, Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), 1)
       .build()
     val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
-    val requestData = new CreatePartitionsRequestData()
-    requestData.topics().add(new 
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
-    val request = new CreatePartitionsRequest.Builder(requestData).build()
-    val expectedResponseDataUnthrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
-      setErrorCode(NONE.code()))
-    val expectedResponseDataThrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
-      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
-      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
-    val response = handleRequest[CreatePartitionsResponse](request, 
controllerApis)
-    if (throttle) {
-      assertEquals(expectedResponseDataThrottled, 
response.data.results().asScala.toSet)
-      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
-    } else {
-      assertEquals(expectedResponseDataUnthrottled, 
response.data.results().asScala.toSet)
-      assertEquals(0, response.throttleTimeMs())
+    try {
+      val requestData = new CreatePartitionsRequestData()
+      requestData.topics().add(new 
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
+      val request = new CreatePartitionsRequest.Builder(requestData).build()
+      val expectedResponseDataUnthrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
+        setErrorCode(NONE.code()))
+      val expectedResponseDataThrottled = Set(new 
CreatePartitionsTopicResult().setName(topicName).
+        setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+        setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+      val response = handleRequest[CreatePartitionsResponse](request, 
controllerApis)
+      if (throttle) {
+        assertEquals(expectedResponseDataThrottled, 
response.data.results().asScala.toSet)
+        assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+      } else {
+        assertEquals(expectedResponseDataUnthrottled, 
response.data.results().asScala.toSet)
+        assertEquals(0, response.throttleTimeMs())
+      }
+    } finally {
+      controllerApis.close()
     }
   }
 
@@ -997,45 +1142,51 @@ class ControllerApisTest {
     val authorizer = mock(classOf[Authorizer])
     val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(Some(authorizer), controller)
-
-    val request = new ElectLeadersRequest.Builder(
-      ElectionType.PREFERRED,
-      null,
-      30000
-    ).build()
-
-    val resource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
-    val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, 
true, true))
-
-    when(authorizer.authorize(
-      any[RequestContext],
-      ArgumentMatchers.eq(actions)
-    )).thenReturn(singletonList(AuthorizationResult.DENIED))
-
-    val response = handleRequest[ElectLeadersResponse](request, controllerApis)
-    assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
Errors.forCode(response.data.errorCode))
+    try {
+      val request = new ElectLeadersRequest.Builder(
+        ElectionType.PREFERRED,
+        null,
+        30000
+      ).build()
+
+      val resource = new ResourcePattern(ResourceType.CLUSTER, 
Resource.CLUSTER_NAME, PatternType.LITERAL)
+      val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, 
true, true))
+
+      when(authorizer.authorize(
+        any[RequestContext],
+        ArgumentMatchers.eq(actions)
+      )).thenReturn(singletonList(AuthorizationResult.DENIED))
+
+      val response = handleRequest[ElectLeadersResponse](request, 
controllerApis)
+      assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, 
Errors.forCode(response.data.errorCode))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testElectLeadersHandledByController(): Unit = {
     val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
-
-    val request = new ElectLeadersRequest.Builder(
-      ElectionType.PREFERRED,
-      null,
-      30000
-    ).build()
-
-    val responseData = new ElectLeadersResponseData()
+    try {
+      val request = new ElectLeadersRequest.Builder(
+        ElectionType.PREFERRED,
+        null,
+        30000
+      ).build()
+
+      val responseData = new ElectLeadersResponseData()
         .setErrorCode(Errors.NOT_CONTROLLER.code)
 
-    when(controller.electLeaders(any[ControllerRequestContext],
-      ArgumentMatchers.eq(request.data)
-    )).thenReturn(CompletableFuture.completedFuture(responseData))
+      when(controller.electLeaders(any[ControllerRequestContext],
+        ArgumentMatchers.eq(request.data)
+      )).thenReturn(CompletableFuture.completedFuture(responseData))
 
-    val response = handleRequest[ElectLeadersResponse](request, controllerApis)
-    assertEquals(Errors.NOT_CONTROLLER, 
Errors.forCode(response.data.errorCode))
+      val response = handleRequest[ElectLeadersResponse](request, 
controllerApis)
+      assertEquals(Errors.NOT_CONTROLLER, 
Errors.forCode(response.data.errorCode))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
@@ -1044,39 +1195,42 @@ class ControllerApisTest {
     val topicName = "foo"
     val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
+    try {
+      val findNamesFuture = CompletableFuture.completedFuture(
+        singletonMap(topicId, new ResultOrError(topicName))
+      )
+      when(controller.findTopicNames(
+        any[ControllerRequestContext],
+        ArgumentMatchers.eq(singleton(topicId))
+      )).thenReturn(findNamesFuture)
 
-    val findNamesFuture = CompletableFuture.completedFuture(
-      singletonMap(topicId, new ResultOrError(topicName))
-    )
-    when(controller.findTopicNames(
-      any[ControllerRequestContext],
-      ArgumentMatchers.eq(singleton(topicId))
-    )).thenReturn(findNamesFuture)
-
-    val findIdsFuture = CompletableFuture.completedFuture(
-      Collections.emptyMap[String, ResultOrError[Uuid]]()
-    )
-    when(controller.findTopicIds(
-      any[ControllerRequestContext],
-      ArgumentMatchers.eq(Collections.emptySet())
-    )).thenReturn(findIdsFuture)
-
-    val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
-    deleteFuture.completeExceptionally(new NotControllerException("Controller 
has moved"))
-    when(controller.deleteTopics(
-      any[ControllerRequestContext],
-      ArgumentMatchers.eq(singleton(topicId))
-    )).thenReturn(deleteFuture)
-
-    val request = new DeleteTopicsRequest.Builder(
-      new DeleteTopicsRequestData().setTopics(singletonList(
-        new DeleteTopicState().setTopicId(topicId)
-      ))
-    ).build()
-
-    val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
-    val topicIdResponse = response.data.responses.asScala.find(_.topicId == 
topicId).get
-    assertEquals(Errors.NOT_CONTROLLER, 
Errors.forCode(topicIdResponse.errorCode))
+      val findIdsFuture = CompletableFuture.completedFuture(
+        Collections.emptyMap[String, ResultOrError[Uuid]]()
+      )
+      when(controller.findTopicIds(
+        any[ControllerRequestContext],
+        ArgumentMatchers.eq(Collections.emptySet())
+      )).thenReturn(findIdsFuture)
+
+      val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
+      deleteFuture.completeExceptionally(new 
NotControllerException("Controller has moved"))
+      when(controller.deleteTopics(
+        any[ControllerRequestContext],
+        ArgumentMatchers.eq(singleton(topicId))
+      )).thenReturn(deleteFuture)
+
+      val request = new DeleteTopicsRequest.Builder(
+        new DeleteTopicsRequestData().setTopics(singletonList(
+          new DeleteTopicState().setTopicId(topicId)
+        ))
+      ).build()
+
+      val response = handleRequest[DeleteTopicsResponse](request, 
controllerApis)
+      val topicIdResponse = response.data.responses.asScala.find(_.topicId == 
topicId).get
+      assertEquals(Errors.NOT_CONTROLLER, 
Errors.forCode(topicIdResponse.errorCode))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @ParameterizedTest(name = "testDeleteTopicsMutationQuota with throttle: {0}")
@@ -1088,23 +1242,27 @@ class ControllerApisTest {
       .newInitialTopic(topicName, topicId, 1)
       .build()
     val controllerApis = createControllerApis(None, controller, new 
Properties(), throttle)
-    val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
-      new DeleteTopicState().setTopicId(topicId)))
-    val request = new DeleteTopicsRequest.Builder(requestData).build()
-    val expectedResponseDataUnthrottled = Set(new 
DeletableTopicResult().setName(topicName).
-      setTopicId(topicId).
-      setErrorCode(NONE.code()))
-    val expectedResponseDataThrottled = Set(new 
DeletableTopicResult().setName(topicName).
-      setTopicId(topicId).
-      setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
-      setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
-    val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
-    if (throttle) {
-      assertEquals(expectedResponseDataThrottled, 
response.data.responses().asScala.toSet)
-      assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
-    } else {
-      assertEquals(expectedResponseDataUnthrottled, 
response.data.responses().asScala.toSet)
-      assertEquals(0, response.throttleTimeMs())
+    try {
+      val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
+        new DeleteTopicState().setTopicId(topicId)))
+      val request = new DeleteTopicsRequest.Builder(requestData).build()
+      val expectedResponseDataUnthrottled = Set(new 
DeletableTopicResult().setName(topicName).
+        setTopicId(topicId).
+        setErrorCode(NONE.code()))
+      val expectedResponseDataThrottled = Set(new 
DeletableTopicResult().setName(topicName).
+        setTopicId(topicId).
+        setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+        setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+      val response = handleRequest[DeleteTopicsResponse](request, 
controllerApis)
+      if (throttle) {
+        assertEquals(expectedResponseDataThrottled, 
response.data.responses().asScala.toSet)
+        assertEquals(MockControllerMutationQuota.throttleTimeMs, 
response.throttleTimeMs())
+      } else {
+        assertEquals(expectedResponseDataUnthrottled, 
response.data.responses().asScala.toSet)
+        assertEquals(0, response.throttleTimeMs())
+      }
+    } finally {
+      controllerApis.close()
     }
   }
 
@@ -1112,31 +1270,34 @@ class ControllerApisTest {
   def testAllocateProducerIdsReturnsNotController(): Unit = {
     val controller = mock(classOf[Controller])
     val controllerApis = createControllerApis(None, controller)
-
-    // We construct the future here to mimic the logic in 
`QuorumController.allocateProducerIds`.
-    // When an exception is raised on the original future, the `thenApply` 
future is also completed
-    // exceptionally, but the underlying cause is wrapped in a 
`CompletionException`.
-    val future = new CompletableFuture[ProducerIdsBlock]
-    val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] { 
result =>
-      new AllocateProducerIdsResponseData()
-        .setProducerIdStart(result.firstProducerId())
-        .setProducerIdLen(result.size())
+    try {
+      // We construct the future here to mimic the logic in 
`QuorumController.allocateProducerIds`.
+      // When an exception is raised on the original future, the `thenApply` 
future is also completed
+      // exceptionally, but the underlying cause is wrapped in a 
`CompletionException`.
+      val future = new CompletableFuture[ProducerIdsBlock]
+      val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] 
{ result =>
+        new AllocateProducerIdsResponseData()
+          .setProducerIdStart(result.firstProducerId())
+          .setProducerIdLen(result.size())
+      }
+      future.completeExceptionally(new NotControllerException("Controller has 
moved"))
+
+      val request = new AllocateProducerIdsRequest.Builder(
+        new AllocateProducerIdsRequestData()
+          .setBrokerId(4)
+          .setBrokerEpoch(93234)
+      ).build()
+
+      when(controller.allocateProducerIds(
+        any[ControllerRequestContext],
+        ArgumentMatchers.eq(request.data)
+      )).thenReturn(thenApplyFuture)
+
+      val response = handleRequest[AllocateProducerIdsResponse](request, 
controllerApis)
+      assertEquals(Errors.NOT_CONTROLLER, response.error)
+    } finally {
+      controllerApis.close()
     }
-    future.completeExceptionally(new NotControllerException("Controller has 
moved"))
-
-    val request = new AllocateProducerIdsRequest.Builder(
-      new AllocateProducerIdsRequestData()
-        .setBrokerId(4)
-        .setBrokerEpoch(93234)
-    ).build()
-
-    when(controller.allocateProducerIds(
-      any[ControllerRequestContext],
-      ArgumentMatchers.eq(request.data)
-    )).thenReturn(thenApplyFuture)
-
-    val response = handleRequest[AllocateProducerIdsResponse](request, 
controllerApis)
-    assertEquals(Errors.NOT_CONTROLLER, response.error)
   }
 
   @Test
@@ -1144,20 +1305,23 @@ class ControllerApisTest {
     val controller = mock(classOf[Controller])
     val authorizer = mock(classOf[Authorizer])
     val controllerApis = createControllerApis(Some(authorizer), controller)
-
-    val request = new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData()).build()
-
-    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(Collections.singletonList(new Action(
-      AclOperation.CLUSTER_ACTION,
-      new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
-      1, true, true
-    )))))
-      .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
-    when(controller.assignReplicasToDirs(any[ControllerRequestContext], 
ArgumentMatchers.eq(request.data)))
-      
.thenReturn(FutureUtils.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
-
-    val response = handleRequest[AssignReplicasToDirsResponse](request, 
controllerApis)
-    assertEquals(new 
AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
 response.data)
+    try {
+      val request = new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData()).build()
+
+      when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(Collections.singletonList(new Action(
+        AclOperation.CLUSTER_ACTION,
+        new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+        1, true, true
+      )))))
+        .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+      when(controller.assignReplicasToDirs(any[ControllerRequestContext], 
ArgumentMatchers.eq(request.data)))
+        
.thenReturn(FutureUtils.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
+
+      val response = handleRequest[AssignReplicasToDirsResponse](request, 
controllerApis)
+      assertEquals(new 
AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
 response.data)
+    } finally {
+      controllerApis.close()
+    }
   }
 
   private def handleRequest[T <: AbstractResponse](
@@ -1185,6 +1349,7 @@ class ControllerApisTest {
         throw new ClassCastException(s"Expected response with type 
${classTag.runtimeClass}, " +
           s"but found ${response.getClass}")
     }
+
   }
 
   @Test
@@ -1205,31 +1370,45 @@ class ControllerApisTest {
     }
 
     // Calling handle does not block since we do not call get() in 
ControllerApis
-    createControllerApis(None,
-      new MockController.Builder().build()).handle(request, null)
-
-    // When we complete this future, the completion stages will fire 
(including the error handler in ControllerApis#request)
-    responseFuture.complete(response)
-
-    // Now we should get an error response with UNSUPPORTED_VERSION
-    val errorResponse = errorResponseFuture.get()
-    assertEquals(1, 
errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+    val controllerApis = createControllerApis(None, new 
MockController.Builder().build())
+    try {
+      controllerApis.handle(request, null)
+
+      // When we complete this future, the completion stages will fire 
(including the error handler in ControllerApis#request)
+      responseFuture.complete(response)
+
+      // Now we should get an error response with UNSUPPORTED_VERSION
+      val errorResponse = errorResponseFuture.get()
+      assertEquals(1, 
errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+    } finally {
+      controllerApis.close()
+    }
   }
 
   @Test
   def testUnauthorizedControllerRegistrationRequest(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleControllerRegistration(buildRequest(
-          new ControllerRegistrationRequest(new 
ControllerRegistrationRequestData(), 0.toShort))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleControllerRegistration(buildRequest(
+            new ControllerRegistrationRequest(new 
ControllerRegistrationRequestData(), 0.toShort)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @Test
   def testUnauthorizedDescribeClusterRequest(): Unit = {
-    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
-      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
-        handleDescribeCluster(buildRequest(
-          new DescribeClusterRequest(new DescribeClusterRequestData(), 
1.toShort))))
+    assertThrows(classOf[ClusterAuthorizationException], () => {
+      val controllerApis = 
createControllerApis(Some(createDenyAllAuthorizer()), new 
MockController.Builder().build())
+      try {
+        controllerApis.handleDescribeCluster(buildRequest(
+            new DescribeClusterRequest(new DescribeClusterRequestData(), 
1.toShort)))
+      } finally {
+        controllerApis.close()
+      }
+    })
   }
 
   @AfterEach

Reply via email to