aliehsaeedii commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2273419281
##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -393,4 +403,119 @@ class AutoTopicCreationManagerTest {
.setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor)
}
+
+ @Test
+ def testTopicCreationErrorCaching(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with errors
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic-1")
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage("Topic 'test-topic-1' already exists.")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Trigger the completion handler
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Verify that the error was cached
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"))
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("test-topic-1"))
+ assertEquals("Topic 'test-topic-1' already exists.",
cachedErrors("test-topic-1"))
+ }
+
+ @Test
+ def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "success-topic" -> new
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+ "failed-topic" -> new
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate mixed response - one success, one failure
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("success-topic")
+ .setErrorCode(Errors.NONE.code())
+ )
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("failed-topic")
+ .setErrorCode(Errors.POLICY_VIOLATION.code())
+ .setErrorMessage("Policy violation")
+ )
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Only the failed topic should be cached
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic",
"failed-topic", "nonexistent-topic"))
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("failed-topic"))
+ assertEquals("Policy violation", cachedErrors("failed-topic"))
+ }
+
+ @Test
+ def testErrorCacheTTL(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ // 测试getTopicCreationErrors在没有缓存时返回空
+ val initialResult =
autoTopicCreationManager.getTopicCreationErrors(Set("nonexistent-topic"))
+ assertTrue(initialResult.isEmpty)
+
+ // 让时间前进,再次查询确保仍然为空
Review Comment:
Wrong chars! The same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]