RaidenE1 commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2347859336
##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -356,4 +371,226 @@ class AutoTopicCreationManagerTest {
.setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor)
}
+
+ @Test
+ def testTopicCreationErrorCaching(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with errors
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic-1")
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage("Topic 'test-topic-1' already exists.")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Trigger the completion handler
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Verify that the error was cached
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("test-topic-1"))
+ assertEquals("Topic 'test-topic-1' already exists.",
cachedErrors("test-topic-1"))
+ }
+
+ @Test
+ def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "success-topic" -> new
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+ "failed-topic" -> new
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate mixed response - one success, one failure
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("success-topic")
+ .setErrorCode(Errors.NONE.code())
+ )
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("failed-topic")
+ .setErrorCode(Errors.POLICY_VIOLATION.code())
+ .setErrorMessage("Policy violation")
+ )
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Only the failed topic should be cached
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
"failed-topic", "nonexistent-topic"), mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("failed-topic"))
+ assertEquals("Policy violation", cachedErrors("failed-topic"))
+ }
+
+ @Test
+ def testErrorCacheTTL(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+
+ // First cache an error by simulating topic creation failure
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with error
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Cache the error at T0
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Verify error is cached and accessible within TTL
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+ // Advance time beyond TTL
+ mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+ // Verify error is now expired and proactively cleaned up
+ val expiredErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively
cleaned up")
+ }
+
+ @Test
+ def testErrorCacheLRUEviction(): Unit = {
+ // Create a config with a small cache size for testing
+ val props = TestUtils.createBrokerConfig(1)
+ props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
+
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG,
"3") // Small cache size for testing
Review Comment:
I think the cache size it set to 1000 by default, if we want to test its
limit, then we need to create 1000 exception in the test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]