This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new ec07ab91c0a KAFKA-16058: close controllerApi instance to avoid thread
leaks (#15084)
ec07ab91c0a is described below
commit ec07ab91c0a5c3471ba702f4a3d5add189f1a660
Author: Luke Chen <[email protected]>
AuthorDate: Fri Dec 29 00:38:20 2023 +0900
KAFKA-16058: close controllerApi instance to avoid thread leaks (#15084)
The controllerApi will create some resources, including the reaper threads.
In ControllerApisTest, we created it on many test cases, but didn't close it.
This commit doesn't change anything in the business logic of the test, it just
adds try/finally to close the controllerApi instance.
Reviewers: Divij Vaidya <[email protected]>
---
.../unit/kafka/server/ControllerApisTest.scala | 1259 +++++++++++---------
1 file changed, 719 insertions(+), 540 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 024f79d537c..66059238e5c 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -206,9 +206,15 @@ class ControllerApisTest {
@Test
def testUnauthorizedFetch(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleFetch(buildRequest(new FetchRequest(new FetchRequestData(),
12))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis = createControllerApis(
+ Some(createDenyAllAuthorizer()), new MockController.Builder().build())
+ try {
+ controllerApis.handleFetch(buildRequest(new FetchRequest(new
FetchRequestData(), 12)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
@@ -223,14 +229,18 @@ class ControllerApisTest {
new CompletableFuture[ApiMessage]()
)
- createControllerApis(None, new MockController.Builder().build())
- .handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12)))
+ val controllerApis = createControllerApis(None, new
MockController.Builder().build())
+ try {
+ controllerApis.handleFetch(buildRequest(new FetchRequest(new
FetchRequestData(), 12)))
- verify(raftManager).handleRequest(
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
- )
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -253,26 +263,36 @@ class ControllerApisTest {
// Local time should be updated when `ControllerApis.handle` returns
val fetchRequestData = new FetchRequestData()
val request = buildRequest(new FetchRequest(fetchRequestData,
ApiKeys.FETCH.latestVersion))
- createControllerApis(None, new MockController.Builder().build())
- .handle(request, RequestLocal.NoCaching)
+ val controllerApis = createControllerApis(None, new
MockController.Builder().build())
+ try {
+ controllerApis.handle(request, RequestLocal.NoCaching)
- verify(raftManager).handleRequest(
- ArgumentMatchers.eq(request.header),
- ArgumentMatchers.eq(fetchRequestData),
- ArgumentMatchers.eq(initialTimeMs)
- )
- assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
- request.apiLocalCompleteTimeNanos - initialTimeNanos,
- TimeUnit.NANOSECONDS
- ))
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.eq(request.header),
+ ArgumentMatchers.eq(fetchRequestData),
+ ArgumentMatchers.eq(initialTimeMs)
+ )
+
+ assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
+ request.apiLocalCompleteTimeNanos - initialTimeNanos,
+ TimeUnit.NANOSECONDS
+ ))
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedFetchSnapshot(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new
FetchSnapshotRequestData(), 0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleFetchSnapshot(buildRequest(new
FetchSnapshotRequest(new FetchSnapshotRequestData(), 0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
@@ -287,21 +307,32 @@ class ControllerApisTest {
new CompletableFuture[ApiMessage]()
)
- createControllerApis(None, new MockController.Builder().build())
- .handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new
FetchSnapshotRequestData(), 0)))
+ val controllerApis = createControllerApis(None, new
MockController.Builder().build())
- verify(raftManager).handleRequest(
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any()
- )
+ try {
+ controllerApis.handleFetchSnapshot(buildRequest(new
FetchSnapshotRequest(new FetchSnapshotRequestData(), 0)))
+
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedVote(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleVote(buildRequest(new VoteRequest.Builder(new
VoteRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis = createControllerApis(
+ Some(createDenyAllAuthorizer()), new MockController.Builder().build())
+ try {
+ controllerApis.handleVote(buildRequest(new VoteRequest.Builder(new
VoteRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
@@ -334,81 +365,115 @@ class ControllerApisTest {
setValue("bar")).iterator())),
).iterator()))
val request = buildRequest(new AlterConfigsRequest(requestData, 0))
- createControllerApis(Some(createDenyAllAuthorizer()),
- new MockController.Builder().build()).handleLegacyAlterConfigs(request)
- val capturedResponse: ArgumentCaptor[AbstractResponse] =
- ArgumentCaptor.forClass(classOf[AbstractResponse])
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None))
- assertNotNull(capturedResponse.getValue)
- val response = capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
- assertEquals(Set(
- new OldAlterConfigsResourceResponse().
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate resource.").
- setResourceName("2").
- setResourceType(ConfigResource.Type.BROKER.id()),
- new OldAlterConfigsResourceResponse().
- setErrorCode(UNSUPPORTED_VERSION.code()).
- setErrorMessage("Unknown resource type 123.").
- setResourceName("baz").
- setResourceType(123.toByte),
- new OldAlterConfigsResourceResponse().
- setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
- setErrorMessage("Cluster authorization failed.").
- setResourceName("1").
- setResourceType(ConfigResource.Type.BROKER.id())),
- response.data().responses().asScala.toSet)
+ val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()),
new MockController.Builder().build())
+ try {
+ controllerApis.handleLegacyAlterConfigs(request)
+ val capturedResponse: ArgumentCaptor[AbstractResponse] =
+ ArgumentCaptor.forClass(classOf[AbstractResponse])
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None))
+ assertNotNull(capturedResponse.getValue)
+ val response =
capturedResponse.getValue.asInstanceOf[AlterConfigsResponse]
+ assertEquals(Set(
+ new OldAlterConfigsResourceResponse().
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate resource.").
+ setResourceName("2").
+ setResourceType(ConfigResource.Type.BROKER.id()),
+ new OldAlterConfigsResourceResponse().
+ setErrorCode(UNSUPPORTED_VERSION.code()).
+ setErrorMessage("Unknown resource type 123.").
+ setResourceName("baz").
+ setResourceType(123.toByte),
+ new OldAlterConfigsResourceResponse().
+ setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
+ setErrorMessage("Cluster authorization failed.").
+ setResourceName("1").
+ setResourceType(ConfigResource.Type.BROKER.id())),
+ response.data().responses().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedBeginQuorumEpoch(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleBeginQuorumEpoch(buildRequest(new
BeginQuorumEpochRequest.Builder(
- new BeginQuorumEpochRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleBeginQuorumEpoch(buildRequest(new
BeginQuorumEpochRequest.Builder(
+ new BeginQuorumEpochRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedEndQuorumEpoch(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleEndQuorumEpoch(buildRequest(new EndQuorumEpochRequest.Builder(
- new EndQuorumEpochRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleEndQuorumEpoch(buildRequest(new
EndQuorumEpochRequest.Builder(
+ new EndQuorumEpochRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedDescribeQuorum(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleDescribeQuorum(buildRequest(new DescribeQuorumRequest.Builder(
- new DescribeQuorumRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleDescribeQuorum(buildRequest(new
DescribeQuorumRequest.Builder(
+ new DescribeQuorumRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedHandleAlterPartitionRequest(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleAlterPartitionRequest(buildRequest(new
AlterPartitionRequest.Builder(
- new AlterPartitionRequestData(), false).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleAlterPartitionRequest(buildRequest(new
AlterPartitionRequest.Builder(
+ new AlterPartitionRequestData(), false).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedHandleBrokerHeartBeatRequest(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleBrokerHeartBeatRequest(buildRequest(new
BrokerHeartbeatRequest.Builder(
- new BrokerHeartbeatRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleBrokerHeartBeatRequest(buildRequest(new
BrokerHeartbeatRequest.Builder(
+ new BrokerHeartbeatRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedHandleUnregisterBroker(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleUnregisterBroker(buildRequest(new
UnregisterBrokerRequest.Builder(
- new UnregisterBrokerRequestData()).build(0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleUnregisterBroker(buildRequest(new
UnregisterBrokerRequest.Builder(
+ new UnregisterBrokerRequestData()).build(0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
@@ -429,26 +494,35 @@ class ControllerApisTest {
val request = buildRequest(brokerRegistrationRequest)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
- createControllerApis(Some(createDenyAllAuthorizer()),
mock(classOf[Controller])).handle(request,
- RequestLocal.withThreadConfinedCaching)
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None))
-
- assertNotNull(capturedResponse.getValue)
-
- val brokerRegistrationResponse =
capturedResponse.getValue.asInstanceOf[BrokerRegistrationResponse]
- assertEquals(Map(CLUSTER_AUTHORIZATION_FAILED -> 1),
- brokerRegistrationResponse.errorCounts().asScala)
+ val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()),
mock(classOf[Controller]))
+ try {
+ controllerApis.handle(request, RequestLocal.withThreadConfinedCaching)
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None))
+
+ assertNotNull(capturedResponse.getValue)
+
+ val brokerRegistrationResponse =
capturedResponse.getValue.asInstanceOf[BrokerRegistrationResponse]
+ assertEquals(Map(CLUSTER_AUTHORIZATION_FAILED -> 1),
+ brokerRegistrationResponse.errorCounts().asScala)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedHandleAlterClientQuotas(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
- new AlterClientQuotasRequestData(), 0))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleAlterClientQuotas(buildRequest(new
AlterClientQuotasRequest(
+ new AlterClientQuotasRequestData(), 0)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
@@ -478,32 +552,37 @@ class ControllerApisTest {
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
).iterator()))
val request = buildRequest(new
IncrementalAlterConfigsRequest.Builder(requestData).build(0))
- createControllerApis(Some(createDenyAllAuthorizer()),
- new
MockController.Builder().build()).handleIncrementalAlterConfigs(request)
- val capturedResponse: ArgumentCaptor[AbstractResponse] =
- ArgumentCaptor.forClass(classOf[AbstractResponse])
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None))
- assertNotNull(capturedResponse.getValue)
- val response =
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
- assertEquals(Set(new AlterConfigsResourceResponse().
+ val controllerApis = createControllerApis(Some(createDenyAllAuthorizer()),
+ new MockController.Builder().build())
+ try {
+ controllerApis.handleIncrementalAlterConfigs(request)
+ val capturedResponse: ArgumentCaptor[AbstractResponse] =
+ ArgumentCaptor.forClass(classOf[AbstractResponse])
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None))
+ assertNotNull(capturedResponse.getValue)
+ val response =
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
+ assertEquals(Set(new AlterConfigsResourceResponse().
setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER.id()),
- new AlterConfigsResourceResponse().
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
- setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message()).
- setResourceName("foo").
- setResourceType(ConfigResource.Type.TOPIC.id()),
- new AlterConfigsResourceResponse().
- setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
- setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
- setResourceName("sub").
- setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
- response.data().responses().asScala.toSet)
+ new AlterConfigsResourceResponse().
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+ setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message()).
+ setResourceName("foo").
+ setResourceType(ConfigResource.Type.TOPIC.id()),
+ new AlterConfigsResourceResponse().
+ setErrorCode(CLUSTER_AUTHORIZATION_FAILED.code()).
+ setErrorMessage(CLUSTER_AUTHORIZATION_FAILED.message()).
+ setResourceName("sub").
+ setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
+ response.data().responses().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@ParameterizedTest
@@ -567,106 +646,129 @@ class ControllerApisTest {
} else {
None
}
- createControllerApis(authorizer,
- new
MockController.Builder().build()).handleIncrementalAlterConfigs(request)
- val capturedResponse: ArgumentCaptor[AbstractResponse] =
- ArgumentCaptor.forClass(classOf[AbstractResponse])
- verify(requestChannel).sendResponse(
- ArgumentMatchers.eq(request),
- capturedResponse.capture(),
- ArgumentMatchers.eq(None))
- assertNotNull(capturedResponse.getValue)
- val response =
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
- assertEquals(Set(
- new AlterConfigsResourceResponse().
- setErrorCode(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
- setErrorMessage(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.message() else null).
- setResourceName("1").
- setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
- new AlterConfigsResourceResponse().
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate resource.").
- setResourceName("3").
- setResourceType(ConfigResource.Type.BROKER.id()),
- new AlterConfigsResourceResponse().
- setErrorCode(UNSUPPORTED_VERSION.code()).
- setErrorMessage("Unknown resource type 124.").
- setResourceName("foo").
- setResourceType(124.toByte),
- new AlterConfigsResourceResponse().
- setErrorCode(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
- setErrorMessage(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.message() else null).
- setResourceName("sub").
- setResourceType(ConfigResource.Type.CLIENT_METRICS.id()),
- new AlterConfigsResourceResponse().
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate resource.").
- setResourceName("sub1").
- setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
- response.data().responses().asScala.toSet)
+ val controllerApis = createControllerApis(authorizer, new
MockController.Builder().build())
+ try {
+ controllerApis.handleIncrementalAlterConfigs(request)
+ val capturedResponse: ArgumentCaptor[AbstractResponse] =
+ ArgumentCaptor.forClass(classOf[AbstractResponse])
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(request),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None))
+ assertNotNull(capturedResponse.getValue)
+ val response =
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
+ assertEquals(Set(
+ new AlterConfigsResourceResponse().
+ setErrorCode(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
+ setErrorMessage(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.message() else null).
+ setResourceName("1").
+ setResourceType(ConfigResource.Type.BROKER_LOGGER.id()),
+ new AlterConfigsResourceResponse().
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate resource.").
+ setResourceName("3").
+ setResourceType(ConfigResource.Type.BROKER.id()),
+ new AlterConfigsResourceResponse().
+ setErrorCode(UNSUPPORTED_VERSION.code()).
+ setErrorMessage("Unknown resource type 124.").
+ setResourceName("foo").
+ setResourceType(124.toByte),
+ new AlterConfigsResourceResponse().
+ setErrorCode(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.code() else NONE.code()).
+ setErrorMessage(if (denyAllAuthorizer)
CLUSTER_AUTHORIZATION_FAILED.message() else null).
+ setResourceName("sub").
+ setResourceType(ConfigResource.Type.CLIENT_METRICS.id()),
+ new AlterConfigsResourceResponse().
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate resource.").
+ setResourceName("sub1").
+ setResourceType(ConfigResource.Type.CLIENT_METRICS.id())),
+ response.data().responses().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedHandleAlterPartitionReassignments(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleAlterPartitionReassignments(buildRequest(new
AlterPartitionReassignmentsRequest.Builder(
- new AlterPartitionReassignmentsRequestData()).build())))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleAlterPartitionReassignments(buildRequest(new
AlterPartitionReassignmentsRequest.Builder(
+ new AlterPartitionReassignmentsRequestData()).build()))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedHandleAllocateProducerIds(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleAllocateProducerIdsRequest(buildRequest(new
AllocateProducerIdsRequest.Builder(
- new AllocateProducerIdsRequestData()).build())))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleAllocateProducerIdsRequest(buildRequest(new
AllocateProducerIdsRequest.Builder(
+ new AllocateProducerIdsRequestData()).build()))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedHandleListPartitionReassignments(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleListPartitionReassignments(buildRequest(new
ListPartitionReassignmentsRequest.Builder(
- new ListPartitionReassignmentsRequestData()).build())))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleListPartitionReassignments(buildRequest(new
ListPartitionReassignmentsRequest.Builder(
+ new ListPartitionReassignmentsRequestData()).build()))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testCreateTopics(): Unit = {
val controller = new MockController.Builder().build()
val controllerApis = createControllerApis(None, controller)
- val request = new CreateTopicsRequestData().setTopics(new
CreatableTopicCollection(
- util.Arrays.asList(new
CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(3),
- new
CreatableTopic().setName("foo").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3),
- new
CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3),
- ).iterator()))
- val expectedResponse = Set(new CreatableTopicResult().setName("foo").
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate topic name."),
- new CreatableTopicResult().setName("bar").
+ try {
+ val request = new CreateTopicsRequestData().setTopics(new
CreatableTopicCollection(
+ util.Arrays.asList(new
CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor(3),
+ new
CreatableTopic().setName("foo").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("bar").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("baz").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("indescribable").setNumPartitions(2).setReplicationFactor(3),
+ new
CreatableTopic().setName("quux").setNumPartitions(2).setReplicationFactor(3),
+ ).iterator()))
+ val expectedResponse = Set(new CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."),
- new CreatableTopicResult().setName("baz").
- setErrorCode(NONE.code()).
- setTopicId(new Uuid(0L, 1L)).
- setNumPartitions(2).
- setReplicationFactor(3).
- setTopicConfigErrorCode(NONE.code()),
- new CreatableTopicResult().setName("indescribable").
- setErrorCode(NONE.code()).
- setTopicId(new Uuid(0L, 2L)).
- setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
- new CreatableTopicResult().setName("quux").
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
- setErrorMessage("Authorization failed."))
- assertEquals(expectedResponse,
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
- false,
- _ => Set("baz", "indescribable"),
- _ => Set("baz")).get().topics().asScala.toSet)
+ new CreatableTopicResult().setName("bar").
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic name."),
+ new CreatableTopicResult().setName("baz").
+ setErrorCode(NONE.code()).
+ setTopicId(new Uuid(0L, 1L)).
+ setNumPartitions(2).
+ setReplicationFactor(3).
+ setTopicConfigErrorCode(NONE.code()),
+ new CreatableTopicResult().setName("indescribable").
+ setErrorCode(NONE.code()).
+ setTopicId(new Uuid(0L, 2L)).
+ setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
+ new CreatableTopicResult().setName("quux").
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+ setErrorMessage("Authorization failed."))
+ assertEquals(expectedResponse,
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
+ false,
+ _ => Set("baz", "indescribable"),
+ _ => Set("baz")).get().topics().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
@@ -674,26 +776,30 @@ class ControllerApisTest {
def testCreateTopicsMutationQuota(throttle: Boolean): Unit = {
val controller = new MockController.Builder().build()
val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
- val topicName = "foo"
- val requestData = new CreateTopicsRequestData().setTopics(new
CreatableTopicCollection(
- util.Collections.singletonList(new
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
- val request = new CreateTopicsRequest.Builder(requestData).build()
- val expectedResponseDataUnthrottled = Set(new
CreatableTopicResult().setName(topicName).
+ try {
+ val topicName = "foo"
+ val requestData = new CreateTopicsRequestData().setTopics(new
CreatableTopicCollection(
+ util.Collections.singletonList(new
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)).iterator()))
+ val request = new CreateTopicsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
CreatableTopicResult().setName(topicName).
setErrorCode(NONE.code()).
setTopicId(new Uuid(0L, 1L)).
setNumPartitions(1).
setReplicationFactor(1).
setTopicConfigErrorCode(NONE.code()))
- val expectedResponseDataThrottled = Set(new
CreatableTopicResult().setName(topicName).
- setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
- setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
- val response = handleRequest[CreateTopicsResponse](request, controllerApis)
- if (throttle) {
- assertEquals(expectedResponseDataThrottled,
response.data.topics().asScala.toSet)
- assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
- } else {
- assertEquals(expectedResponseDataUnthrottled,
response.data.topics().asScala.toSet)
- assertEquals(0, response.throttleTimeMs())
+ val expectedResponseDataThrottled = Set(new
CreatableTopicResult().setName(topicName).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[CreateTopicsResponse](request,
controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.topics().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.topics().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ } finally {
+ controllerApis.close()
}
}
@@ -702,20 +808,24 @@ class ControllerApisTest {
val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
val controller = new MockController.Builder().newInitialTopic("foo",
fooId).build()
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData().setTopicNames(
- util.Arrays.asList("foo", "bar", "quux", "quux"))
- val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+ try {
+ val request = new DeleteTopicsRequestData().setTopicNames(
+ util.Arrays.asList("foo", "bar", "quux", "quux"))
+ val expectedResponse = Set(new DeletableTopicResult().setName("quux").
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."),
- new DeletableTopicResult().setName("bar").
- setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
- setErrorMessage("This server does not host this topic-partition."),
- new DeletableTopicResult().setName("foo").setTopicId(fooId))
- assertEquals(expectedResponse,
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- true,
- _ => Set.empty,
- _ => Set.empty).get().asScala.toSet)
+ new DeletableTopicResult().setName("bar").
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("This server does not host this topic-partition."),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId))
+ assertEquals(expectedResponse,
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ true,
+ _ => Set.empty,
+ _ => Set.empty).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -725,23 +835,27 @@ class ControllerApisTest {
val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
val controller = new MockController.Builder().newInitialTopic("foo",
fooId).build()
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(quuxId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(quuxId))
- val response = Set(new
DeletableTopicResult().setName(null).setTopicId(quuxId).
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(quuxId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(quuxId))
+ val response = Set(new
DeletableTopicResult().setName(null).setTopicId(quuxId).
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic id."),
- new DeletableTopicResult().setName(null).setTopicId(barId).
- setErrorCode(UNKNOWN_TOPIC_ID.code()).
- setErrorMessage("This server does not host this topic ID."),
- new DeletableTopicResult().setName("foo").setTopicId(fooId))
- assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- true,
- _ => Set.empty,
- _ => Set.empty).get().asScala.toSet)
+ new DeletableTopicResult().setName(null).setTopicId(barId).
+ setErrorCode(UNKNOWN_TOPIC_ID.code()).
+ setErrorMessage("This server does not host this topic ID."),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId))
+ assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ true,
+ _ => Set.empty,
+ _ => Set.empty).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -753,37 +867,41 @@ class ControllerApisTest {
newInitialTopic("foo", fooId).
newInitialTopic("bar", barId).build()
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(fooId))
- request.topics().add(new
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
- request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
- val response = Set(new
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(fooId))
+ request.topics().add(new
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(bazId))
+ val response = Set(new
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Neither topic name nor id were specified."),
- new DeletableTopicResult().setName("foo").setTopicId(fooId).
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("You may not specify both topic name and topic id."),
- new DeletableTopicResult().setName("bar").setTopicId(barId).
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("The provided topic name maps to an ID that was
already supplied."),
- new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate topic name."),
- new DeletableTopicResult().setName(null).setTopicId(bazId).
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate topic id."))
- assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- false,
- names => names.toSet,
- names => names.toSet).get().asScala.toSet)
+ new DeletableTopicResult().setName("foo").setTopicId(fooId).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("You may not specify both topic name and topic id."),
+ new DeletableTopicResult().setName("bar").setTopicId(barId).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("The provided topic name maps to an ID that was
already supplied."),
+ new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic name."),
+ new DeletableTopicResult().setName(null).setTopicId(bazId).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic id."))
+ assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ names => names.toSet,
+ names => names.toSet).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -798,28 +916,32 @@ class ControllerApisTest {
newInitialTopic("baz", bazId).
newInitialTopic("quux", quuxId).build()
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
- request.topics().add(new
DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
- val response = Set(new
DeletableTopicResult().setName(null).setTopicId(barId).
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
+ request.topics().add(new
DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+ val response = Set(new
DeletableTopicResult().setName(null).setTopicId(barId).
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
- new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
- setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
- new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
- setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
- new DeletableTopicResult().setName("foo").setTopicId(fooId).
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
- setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message))
- assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- false,
- _ => Set("foo", "baz"),
- _ => Set.empty).get().asScala.toSet)
+ new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName("foo").setTopicId(fooId).
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message))
+ assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT,
request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "baz"),
+ _ => Set.empty).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -827,24 +949,28 @@ class ControllerApisTest {
val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
val controller = new MockController.Builder().build()
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
- val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
+ val expectedResponse = Set(new DeletableTopicResult().setName("foo").
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code).
setErrorMessage(UNKNOWN_TOPIC_OR_PARTITION.message),
- new DeletableTopicResult().setName("bar").
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
- setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
- new DeletableTopicResult().setName(null).setTopicId(barId).
- setErrorCode(UNKNOWN_TOPIC_ID.code).
- setErrorMessage(UNKNOWN_TOPIC_ID.message))
- assertEquals(expectedResponse,
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- false,
- _ => Set("foo"),
- _ => Set.empty).get().asScala.toSet)
+ new DeletableTopicResult().setName("bar").
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message),
+ new DeletableTopicResult().setName(null).setTopicId(barId).
+ setErrorCode(UNKNOWN_TOPIC_ID.code).
+ setErrorMessage(UNKNOWN_TOPIC_ID.message))
+ assertEquals(expectedResponse,
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo"),
+ _ => Set.empty).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -855,15 +981,19 @@ class ControllerApisTest {
newInitialTopic("foo", fooId).build()
controller.setActive(false)
val controllerApis = createControllerApis(None, controller)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
- request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
- assertEquals(classOf[NotControllerException], assertThrows(
- classOf[ExecutionException], () =>
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- false,
- _ => Set("foo", "bar"),
- _ => Set("foo", "bar")).get()).getCause.getClass)
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(fooId))
+ request.topics().add(new
DeleteTopicState().setName(null).setTopicId(barId))
+ assertEquals(classOf[NotControllerException], assertThrows(
+ classOf[ExecutionException], () =>
controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar")).get()).getCause.getClass)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -874,20 +1004,24 @@ class ControllerApisTest {
val props = new Properties()
props.put(KafkaConfig.DeleteTopicEnableProp, "false")
val controllerApis = createControllerApis(None, controller, props)
- val request = new DeleteTopicsRequestData()
- request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
- assertThrows(classOf[TopicDeletionDisabledException],
- () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
- ApiKeys.DELETE_TOPICS.latestVersion().toInt,
- false,
- _ => Set("foo", "bar"),
- _ => Set("foo", "bar")))
- assertThrows(classOf[InvalidRequestException],
- () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
- 1,
- false,
- _ => Set("foo", "bar"),
- _ => Set("foo", "bar")))
+ try {
+ val request = new DeleteTopicsRequestData()
+ request.topics().add(new
DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+ assertThrows(classOf[TopicDeletionDisabledException],
+ () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+ ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar")))
+ assertThrows(classOf[InvalidRequestException],
+ () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
+ 1,
+ false,
+ _ => Set("foo", "bar"),
+ _ => Set("foo", "bar")))
+ } finally {
+ controllerApis.close()
+ }
}
@ParameterizedTest
@@ -895,37 +1029,41 @@ class ControllerApisTest {
def testCreatePartitionsRequest(validateOnly: Boolean): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
- val request = new CreatePartitionsRequestData()
- request.topics().add(new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
- request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
- request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
- request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
- request.topics().add(new
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
- request.setValidateOnly(validateOnly)
-
- // Check if the controller is called correctly with the 'validateOnly'
field set appropriately.
- when(controller.createPartitions(
- any(),
- ArgumentMatchers.eq(
- Collections.singletonList(
- new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
- ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
- .completedFuture(Collections.singletonList(
- new CreatePartitionsTopicResult().setName("foo").
- setErrorCode(NONE.code()).
- setErrorMessage(null)
- )))
- assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
- setErrorCode(NONE.code()).
- setErrorMessage(null),
- new CreatePartitionsTopicResult().setName("bar").
- setErrorCode(INVALID_REQUEST.code()).
- setErrorMessage("Duplicate topic name."),
- new CreatePartitionsTopicResult().setName("baz").
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
- setErrorMessage(null)),
- controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
- _ => Set("foo", "bar")).get().asScala.toSet)
+ try {
+ val request = new CreatePartitionsRequestData()
+ request.topics().add(new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
+ request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+ request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+ request.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
+ request.topics().add(new
CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
+ request.setValidateOnly(validateOnly)
+
+ // Check if the controller is called correctly with the 'validateOnly'
field set appropriately.
+ when(controller.createPartitions(
+ any(),
+ ArgumentMatchers.eq(
+ Collections.singletonList(
+ new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))),
+ ArgumentMatchers.eq(validateOnly))).thenReturn(CompletableFuture
+ .completedFuture(Collections.singletonList(
+ new CreatePartitionsTopicResult().setName("foo").
+ setErrorCode(NONE.code()).
+ setErrorMessage(null)
+ )))
+ assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new CreatePartitionsTopicResult().setName("bar").
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage("Duplicate topic name."),
+ new CreatePartitionsTopicResult().setName("baz").
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+ setErrorMessage(null)),
+ controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
+ _ => Set("foo", "bar")).get().asScala.toSet)
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -935,35 +1073,38 @@ class ControllerApisTest {
.build()
val authorizer = mock(classOf[Authorizer])
val controllerApis = createControllerApis(Some(authorizer), controller)
-
- val requestData = new CreatePartitionsRequestData()
- requestData.topics().add(new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
- requestData.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
- val request = new CreatePartitionsRequest.Builder(requestData).build()
-
- val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo",
PatternType.LITERAL)
- val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
-
- val barResource = new ResourcePattern(ResourceType.TOPIC, "bar",
PatternType.LITERAL)
- val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
-
- when(authorizer.authorize(
- any[RequestContext],
- any[util.List[Action]]
- )).thenAnswer { invocation =>
- val actions = invocation.getArgument[util.List[Action]](1).asScala
- val results = actions.map { action =>
- if (action == fooAction) AuthorizationResult.ALLOWED
- else if (action == barAction) AuthorizationResult.DENIED
- else throw new AssertionError(s"Unexpected action $action")
+ try {
+ val requestData = new CreatePartitionsRequestData()
+ requestData.topics().add(new
CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
+ requestData.topics().add(new
CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
+ val request = new CreatePartitionsRequest.Builder(requestData).build()
+
+ val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo",
PatternType.LITERAL)
+ val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true,
true)
+
+ val barResource = new ResourcePattern(ResourceType.TOPIC, "bar",
PatternType.LITERAL)
+ val barAction = new Action(AclOperation.ALTER, barResource, 1, true,
true)
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument[util.List[Action]](1).asScala
+ val results = actions.map { action =>
+ if (action == fooAction) AuthorizationResult.ALLOWED
+ else if (action == barAction) AuthorizationResult.DENIED
+ else throw new AssertionError(s"Unexpected action $action")
+ }
+ new util.ArrayList[AuthorizationResult](results.asJava)
}
- new util.ArrayList[AuthorizationResult](results.asJava)
- }
- val response = handleRequest[CreatePartitionsResponse](request,
controllerApis)
- val results = response.data.results.asScala
- assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result
=> Errors.forCode(result.errorCode)))
- assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name
== "bar").map(result => Errors.forCode(result.errorCode)))
+ val response = handleRequest[CreatePartitionsResponse](request,
controllerApis)
+ val results = response.data.results.asScala
+ assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result
=> Errors.forCode(result.errorCode)))
+ assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED),
results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
+ } finally {
+ controllerApis.close()
+ }
}
@ParameterizedTest(name = "testCreatePartitionsMutationQuota with throttle:
{0}")
@@ -974,21 +1115,25 @@ class ControllerApisTest {
.newInitialTopic(topicName, Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"), 1)
.build()
val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
- val requestData = new CreatePartitionsRequestData()
- requestData.topics().add(new
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
- val request = new CreatePartitionsRequest.Builder(requestData).build()
- val expectedResponseDataUnthrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
- setErrorCode(NONE.code()))
- val expectedResponseDataThrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
- setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
- setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
- val response = handleRequest[CreatePartitionsResponse](request,
controllerApis)
- if (throttle) {
- assertEquals(expectedResponseDataThrottled,
response.data.results().asScala.toSet)
- assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
- } else {
- assertEquals(expectedResponseDataUnthrottled,
response.data.results().asScala.toSet)
- assertEquals(0, response.throttleTimeMs())
+ try {
+ val requestData = new CreatePartitionsRequestData()
+ requestData.topics().add(new
CreatePartitionsTopic().setName(topicName).setAssignments(null).setCount(2))
+ val request = new CreatePartitionsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
+ setErrorCode(NONE.code()))
+ val expectedResponseDataThrottled = Set(new
CreatePartitionsTopicResult().setName(topicName).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[CreatePartitionsResponse](request,
controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.results().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.results().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ } finally {
+ controllerApis.close()
}
}
@@ -997,45 +1142,51 @@ class ControllerApisTest {
val authorizer = mock(classOf[Authorizer])
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(Some(authorizer), controller)
-
- val request = new ElectLeadersRequest.Builder(
- ElectionType.PREFERRED,
- null,
- 30000
- ).build()
-
- val resource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
- val actions = singletonList(new Action(AclOperation.ALTER, resource, 1,
true, true))
-
- when(authorizer.authorize(
- any[RequestContext],
- ArgumentMatchers.eq(actions)
- )).thenReturn(singletonList(AuthorizationResult.DENIED))
-
- val response = handleRequest[ElectLeadersResponse](request, controllerApis)
- assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED,
Errors.forCode(response.data.errorCode))
+ try {
+ val request = new ElectLeadersRequest.Builder(
+ ElectionType.PREFERRED,
+ null,
+ 30000
+ ).build()
+
+ val resource = new ResourcePattern(ResourceType.CLUSTER,
Resource.CLUSTER_NAME, PatternType.LITERAL)
+ val actions = singletonList(new Action(AclOperation.ALTER, resource, 1,
true, true))
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ ArgumentMatchers.eq(actions)
+ )).thenReturn(singletonList(AuthorizationResult.DENIED))
+
+ val response = handleRequest[ElectLeadersResponse](request,
controllerApis)
+ assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED,
Errors.forCode(response.data.errorCode))
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testElectLeadersHandledByController(): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
-
- val request = new ElectLeadersRequest.Builder(
- ElectionType.PREFERRED,
- null,
- 30000
- ).build()
-
- val responseData = new ElectLeadersResponseData()
+ try {
+ val request = new ElectLeadersRequest.Builder(
+ ElectionType.PREFERRED,
+ null,
+ 30000
+ ).build()
+
+ val responseData = new ElectLeadersResponseData()
.setErrorCode(Errors.NOT_CONTROLLER.code)
- when(controller.electLeaders(any[ControllerRequestContext],
- ArgumentMatchers.eq(request.data)
- )).thenReturn(CompletableFuture.completedFuture(responseData))
+ when(controller.electLeaders(any[ControllerRequestContext],
+ ArgumentMatchers.eq(request.data)
+ )).thenReturn(CompletableFuture.completedFuture(responseData))
- val response = handleRequest[ElectLeadersResponse](request, controllerApis)
- assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(response.data.errorCode))
+ val response = handleRequest[ElectLeadersResponse](request,
controllerApis)
+ assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(response.data.errorCode))
+ } finally {
+ controllerApis.close()
+ }
}
@Test
@@ -1044,39 +1195,42 @@ class ControllerApisTest {
val topicName = "foo"
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
+ try {
+ val findNamesFuture = CompletableFuture.completedFuture(
+ singletonMap(topicId, new ResultOrError(topicName))
+ )
+ when(controller.findTopicNames(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(singleton(topicId))
+ )).thenReturn(findNamesFuture)
- val findNamesFuture = CompletableFuture.completedFuture(
- singletonMap(topicId, new ResultOrError(topicName))
- )
- when(controller.findTopicNames(
- any[ControllerRequestContext],
- ArgumentMatchers.eq(singleton(topicId))
- )).thenReturn(findNamesFuture)
-
- val findIdsFuture = CompletableFuture.completedFuture(
- Collections.emptyMap[String, ResultOrError[Uuid]]()
- )
- when(controller.findTopicIds(
- any[ControllerRequestContext],
- ArgumentMatchers.eq(Collections.emptySet())
- )).thenReturn(findIdsFuture)
-
- val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
- deleteFuture.completeExceptionally(new NotControllerException("Controller
has moved"))
- when(controller.deleteTopics(
- any[ControllerRequestContext],
- ArgumentMatchers.eq(singleton(topicId))
- )).thenReturn(deleteFuture)
-
- val request = new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData().setTopics(singletonList(
- new DeleteTopicState().setTopicId(topicId)
- ))
- ).build()
-
- val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
- val topicIdResponse = response.data.responses.asScala.find(_.topicId ==
topicId).get
- assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(topicIdResponse.errorCode))
+ val findIdsFuture = CompletableFuture.completedFuture(
+ Collections.emptyMap[String, ResultOrError[Uuid]]()
+ )
+ when(controller.findTopicIds(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(Collections.emptySet())
+ )).thenReturn(findIdsFuture)
+
+ val deleteFuture = new CompletableFuture[util.Map[Uuid, ApiError]]()
+ deleteFuture.completeExceptionally(new
NotControllerException("Controller has moved"))
+ when(controller.deleteTopics(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(singleton(topicId))
+ )).thenReturn(deleteFuture)
+
+ val request = new DeleteTopicsRequest.Builder(
+ new DeleteTopicsRequestData().setTopics(singletonList(
+ new DeleteTopicState().setTopicId(topicId)
+ ))
+ ).build()
+
+ val response = handleRequest[DeleteTopicsResponse](request,
controllerApis)
+ val topicIdResponse = response.data.responses.asScala.find(_.topicId ==
topicId).get
+ assertEquals(Errors.NOT_CONTROLLER,
Errors.forCode(topicIdResponse.errorCode))
+ } finally {
+ controllerApis.close()
+ }
}
@ParameterizedTest(name = "testDeleteTopicsMutationQuota with throttle: {0}")
@@ -1088,23 +1242,27 @@ class ControllerApisTest {
.newInitialTopic(topicName, topicId, 1)
.build()
val controllerApis = createControllerApis(None, controller, new
Properties(), throttle)
- val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
- new DeleteTopicState().setTopicId(topicId)))
- val request = new DeleteTopicsRequest.Builder(requestData).build()
- val expectedResponseDataUnthrottled = Set(new
DeletableTopicResult().setName(topicName).
- setTopicId(topicId).
- setErrorCode(NONE.code()))
- val expectedResponseDataThrottled = Set(new
DeletableTopicResult().setName(topicName).
- setTopicId(topicId).
- setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
- setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
- val response = handleRequest[DeleteTopicsResponse](request, controllerApis)
- if (throttle) {
- assertEquals(expectedResponseDataThrottled,
response.data.responses().asScala.toSet)
- assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
- } else {
- assertEquals(expectedResponseDataUnthrottled,
response.data.responses().asScala.toSet)
- assertEquals(0, response.throttleTimeMs())
+ try {
+ val requestData = new DeleteTopicsRequestData().setTopics(singletonList(
+ new DeleteTopicState().setTopicId(topicId)))
+ val request = new DeleteTopicsRequest.Builder(requestData).build()
+ val expectedResponseDataUnthrottled = Set(new
DeletableTopicResult().setName(topicName).
+ setTopicId(topicId).
+ setErrorCode(NONE.code()))
+ val expectedResponseDataThrottled = Set(new
DeletableTopicResult().setName(topicName).
+ setTopicId(topicId).
+ setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
+ setErrorMessage(THROTTLING_QUOTA_EXCEEDED.message()))
+ val response = handleRequest[DeleteTopicsResponse](request,
controllerApis)
+ if (throttle) {
+ assertEquals(expectedResponseDataThrottled,
response.data.responses().asScala.toSet)
+ assertEquals(MockControllerMutationQuota.throttleTimeMs,
response.throttleTimeMs())
+ } else {
+ assertEquals(expectedResponseDataUnthrottled,
response.data.responses().asScala.toSet)
+ assertEquals(0, response.throttleTimeMs())
+ }
+ } finally {
+ controllerApis.close()
}
}
@@ -1112,31 +1270,34 @@ class ControllerApisTest {
def testAllocateProducerIdsReturnsNotController(): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
-
- // We construct the future here to mimic the logic in
`QuorumController.allocateProducerIds`.
- // When an exception is raised on the original future, the `thenApply`
future is also completed
- // exceptionally, but the underlying cause is wrapped in a
`CompletionException`.
- val future = new CompletableFuture[ProducerIdsBlock]
- val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData] {
result =>
- new AllocateProducerIdsResponseData()
- .setProducerIdStart(result.firstProducerId())
- .setProducerIdLen(result.size())
+ try {
+ // We construct the future here to mimic the logic in
`QuorumController.allocateProducerIds`.
+ // When an exception is raised on the original future, the `thenApply`
future is also completed
+ // exceptionally, but the underlying cause is wrapped in a
`CompletionException`.
+ val future = new CompletableFuture[ProducerIdsBlock]
+ val thenApplyFuture = future.thenApply[AllocateProducerIdsResponseData]
{ result =>
+ new AllocateProducerIdsResponseData()
+ .setProducerIdStart(result.firstProducerId())
+ .setProducerIdLen(result.size())
+ }
+ future.completeExceptionally(new NotControllerException("Controller has
moved"))
+
+ val request = new AllocateProducerIdsRequest.Builder(
+ new AllocateProducerIdsRequestData()
+ .setBrokerId(4)
+ .setBrokerEpoch(93234)
+ ).build()
+
+ when(controller.allocateProducerIds(
+ any[ControllerRequestContext],
+ ArgumentMatchers.eq(request.data)
+ )).thenReturn(thenApplyFuture)
+
+ val response = handleRequest[AllocateProducerIdsResponse](request,
controllerApis)
+ assertEquals(Errors.NOT_CONTROLLER, response.error)
+ } finally {
+ controllerApis.close()
}
- future.completeExceptionally(new NotControllerException("Controller has
moved"))
-
- val request = new AllocateProducerIdsRequest.Builder(
- new AllocateProducerIdsRequestData()
- .setBrokerId(4)
- .setBrokerEpoch(93234)
- ).build()
-
- when(controller.allocateProducerIds(
- any[ControllerRequestContext],
- ArgumentMatchers.eq(request.data)
- )).thenReturn(thenApplyFuture)
-
- val response = handleRequest[AllocateProducerIdsResponse](request,
controllerApis)
- assertEquals(Errors.NOT_CONTROLLER, response.error)
}
@Test
@@ -1144,20 +1305,23 @@ class ControllerApisTest {
val controller = mock(classOf[Controller])
val authorizer = mock(classOf[Authorizer])
val controllerApis = createControllerApis(Some(authorizer), controller)
-
- val request = new AssignReplicasToDirsRequest.Builder(new
AssignReplicasToDirsRequestData()).build()
-
- when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(Collections.singletonList(new Action(
- AclOperation.CLUSTER_ACTION,
- new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME,
PatternType.LITERAL),
- 1, true, true
- )))))
- .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
- when(controller.assignReplicasToDirs(any[ControllerRequestContext],
ArgumentMatchers.eq(request.data)))
-
.thenReturn(FutureUtils.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
-
- val response = handleRequest[AssignReplicasToDirsResponse](request,
controllerApis)
- assertEquals(new
AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
response.data)
+ try {
+ val request = new AssignReplicasToDirsRequest.Builder(new
AssignReplicasToDirsRequestData()).build()
+
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(Collections.singletonList(new Action(
+ AclOperation.CLUSTER_ACTION,
+ new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME,
PatternType.LITERAL),
+ 1, true, true
+ )))))
+ .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+ when(controller.assignReplicasToDirs(any[ControllerRequestContext],
ArgumentMatchers.eq(request.data)))
+
.thenReturn(FutureUtils.failedFuture[AssignReplicasToDirsResponseData](Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
+
+ val response = handleRequest[AssignReplicasToDirsResponse](request,
controllerApis)
+ assertEquals(new
AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
response.data)
+ } finally {
+ controllerApis.close()
+ }
}
private def handleRequest[T <: AbstractResponse](
@@ -1185,6 +1349,7 @@ class ControllerApisTest {
throw new ClassCastException(s"Expected response with type
${classTag.runtimeClass}, " +
s"but found ${response.getClass}")
}
+
}
@Test
@@ -1205,31 +1370,45 @@ class ControllerApisTest {
}
// Calling handle does not block since we do not call get() in
ControllerApis
- createControllerApis(None,
- new MockController.Builder().build()).handle(request, null)
-
- // When we complete this future, the completion stages will fire
(including the error handler in ControllerApis#request)
- responseFuture.complete(response)
-
- // Now we should get an error response with UNSUPPORTED_VERSION
- val errorResponse = errorResponseFuture.get()
- assertEquals(1,
errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+ val controllerApis = createControllerApis(None, new
MockController.Builder().build())
+ try {
+ controllerApis.handle(request, null)
+
+ // When we complete this future, the completion stages will fire
(including the error handler in ControllerApis#request)
+ responseFuture.complete(response)
+
+ // Now we should get an error response with UNSUPPORTED_VERSION
+ val errorResponse = errorResponseFuture.get()
+ assertEquals(1,
errorResponse.errorCounts().getOrDefault(Errors.UNSUPPORTED_VERSION, 0))
+ } finally {
+ controllerApis.close()
+ }
}
@Test
def testUnauthorizedControllerRegistrationRequest(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleControllerRegistration(buildRequest(
- new ControllerRegistrationRequest(new
ControllerRegistrationRequestData(), 0.toShort))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleControllerRegistration(buildRequest(
+ new ControllerRegistrationRequest(new
ControllerRegistrationRequestData(), 0.toShort)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@Test
def testUnauthorizedDescribeClusterRequest(): Unit = {
- assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
- Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
- handleDescribeCluster(buildRequest(
- new DescribeClusterRequest(new DescribeClusterRequestData(),
1.toShort))))
+ assertThrows(classOf[ClusterAuthorizationException], () => {
+ val controllerApis =
createControllerApis(Some(createDenyAllAuthorizer()), new
MockController.Builder().build())
+ try {
+ controllerApis.handleDescribeCluster(buildRequest(
+ new DescribeClusterRequest(new DescribeClusterRequestData(),
1.toShort)))
+ } finally {
+ controllerApis.close()
+ }
+ })
}
@AfterEach