Copilot commented on code in PR #20325:
URL: https://github.com/apache/kafka/pull/20325#discussion_r2315358917
##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -53,17 +55,47 @@ trait AutoTopicCreationManager {
requestContext: RequestContext
): Unit
+ def getTopicCreationErrors(
+ topicNames: Set[String],
+ errorCacheTtlMs: Long
+ ): Map[String, String]
+
+ def close(): Unit = {}
+
}
+case class CachedTopicCreationError(
+ errorMessage: String,
+ time: Time
+) {
+ val timestamp: Long = time.milliseconds()
Review Comment:
The timestamp should be captured when the error is created, not when
accessed. The current implementation captures the timestamp on object
initialization, but since `time.milliseconds()` is called every time the case
class is instantiated, this could lead to inconsistent timestamps if the Time
instance is mutable or if multiple instances share the same Time object.
```suggestion
errorMessage: String
) {
val timestamp: Long = System.currentTimeMillis()
```
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2889,9 +2889,31 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else {
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate,
requestContext);
+
+ // Check for cached topic creation errors only if there's
already a MISSING_INTERNAL_TOPICS status
+ val hasMissingInternalTopicsStatus = responseData.status() !=
null &&
+ responseData.status().stream().anyMatch(s => s.statusCode() ==
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+
+ if (hasMissingInternalTopicsStatus) {
+ // Calculate group-specific error cache TTL
+ val errorCacheTtlMs =
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
+ .map(_.streamsSessionTimeoutMs().toLong)
+
.getOrElse(config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs().toLong)
+
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(topicsToCreate.keys.toSet,
errorCacheTtlMs)
+ if (cachedErrors.nonEmpty) {
+ val missingInternalTopicStatus =
+ responseData.status().stream().filter(x => x.statusCode()
==
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+ val creationErrorDetails = cachedErrors.map { case (topic,
error) => s"$topic ($error)" }.mkString(", ")
+ if (missingInternalTopicStatus.isPresent) {
+ missingInternalTopicStatus.get().setStatusDetail(
+ missingInternalTopicStatus.get().statusDetail() + s";
Creation failed: $creationErrorDetails."
+ )
Review Comment:
Potential null pointer exception if `statusDetail()` returns null. The
concatenation should handle the case where the existing status detail is null.
##########
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala:
##########
@@ -356,4 +365,230 @@ class AutoTopicCreationManagerTest {
.setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor)
}
+
+ @Test
+ def testTopicCreationErrorCaching(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with errors
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic-1")
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage("Topic 'test-topic-1' already exists.")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Trigger the completion handler
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Verify that the error was cached
+ val defaultTtlMs =
config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs()
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic-1"),
defaultTtlMs)
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("test-topic-1"))
+ assertEquals("Topic 'test-topic-1' already exists.",
cachedErrors("test-topic-1"))
+ }
+
+ @Test
+ def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+ val topics = Map(
+ "success-topic" -> new
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+ "failed-topic" -> new
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate mixed response - one success, one failure
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("success-topic")
+ .setErrorCode(Errors.NONE.code())
+ )
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("failed-topic")
+ .setErrorCode(Errors.POLICY_VIOLATION.code())
+ .setErrorMessage("Policy violation")
+ )
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ // Only the failed topic should be cached
+ val defaultTtlMs =
config.groupCoordinatorConfig.streamsGroupSessionTimeoutMs()
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("success-topic",
"failed-topic", "nonexistent-topic"), defaultTtlMs)
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("failed-topic"))
+ assertEquals("Policy violation", cachedErrors("failed-topic"))
+ }
+
+ @Test
+ def testErrorCacheTTL(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime)
+
+
+ // First cache an error by simulating topic creation failure
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with error
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Cache the error at T0
+
argumentCaptor.getValue.asInstanceOf[ControllerRequestCompletionHandler].onComplete(clientResponse)
+
+ val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+
+ // Verify error is cached and accessible within TTL
+ val cachedErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
+ assertEquals(1, cachedErrors.size)
+ assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+ // Advance time beyond TTL
+ mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+ // Verify error is now expired and proactively cleaned up
+ val expiredErrors =
autoTopicCreationManager.getTopicCreationErrors(Set("test-topic"), shortTtlMs)
+ assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively
cleaned up")
+ }
+
+ @Test
+ def testErrorCacheLRUEviction(): Unit = {
+ // Create a config with a small cache size for testing
+ val props = TestUtils.createBrokerConfig(1)
+ props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
+
props.setProperty(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG,
"3") // Small cache size for testing
+
+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
internalTopicPartitions.toString)
+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG
, internalTopicPartitions.toString)
+
+ props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+
+ val smallCacheConfig = KafkaConfig.fromProps(props)
+
+ // Verify the configuration was properly set
+ assertEquals(3, smallCacheConfig.maxIncrementalFetchSessionCacheSlots,
"Cache size configuration should be 3")
+
+ // Replace the test class's config with our smallCacheConfig
+ // so that initializeRequestContext will use the correct config
+ config = smallCacheConfig
Review Comment:
Modifying the test class's config field directly could affect other tests if
they run in the same instance. Consider creating a separate
AutoTopicCreationManager instance with the small cache config instead of
modifying the shared config field.
##########
core/src/main/scala/kafka/server/AutoTopicCreationManager.scala:
##########
@@ -96,7 +128,56 @@ class DefaultAutoTopicCreationManager(
requestContext: RequestContext
): Unit = {
if (topics.nonEmpty) {
- sendCreateTopicRequest(topics, Some(requestContext))
+ sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext))
+ }
+ }
+
+ override def getTopicCreationErrors(
+ topicNames: Set[String],
+ errorCacheTtlMs: Long
+ ): Map[String, String] = {
+ // Proactively expire old entries using the provided TTL
+ expireOldEntries(errorCacheTtlMs)
+
+ val errors = mutable.Map.empty[String, String]
+
+ // Check requested topics
+ topicNames.foreach { topicName =>
+ Option(topicCreationErrorCache.get(topicName)) match {
+ case Some(error) =>
+ errors.put(topicName, error.errorMessage)
+ case None =>
+ }
+ }
+
+ errors.toMap
+ }
+
+ /**
+ * Remove expired entries from the cache using the provided TTL.
+ * Since we use LinkedHashMap with insertion order, we only need to check
+ * entries from the beginning until we find a non-expired entry.
+ */
+ private def expireOldEntries(ttlMs: Long): Unit = {
+ val currentTime = time.milliseconds()
+
+ // Iterate and remove expired entries
+ val iterator = topicCreationErrorCache.entrySet().iterator()
+
+ breakable {
+ while (iterator.hasNext) {
+ val entry = iterator.next()
+ val cachedError = entry.getValue
+
+ if (currentTime - cachedError.timestamp > ttlMs) {
+ iterator.remove()
+ debug(s"Removed expired topic creation error cache entry for
${entry.getKey}")
+ } else {
+ // Since entries are in insertion order, if this entry is not
expired,
+ // all following entries are also not expired
+ break()
+ }
Review Comment:
The assumption that entries are in insertion order and that if one entry is
not expired, all following entries are also not expired is incorrect. The
LinkedHashMap is configured with `accessOrder = false` (line 94), meaning it
maintains insertion order, but entries can have different timestamps based on
when they were inserted. Breaking early could leave expired entries in the
cache.
```suggestion
while (iterator.hasNext) {
val entry = iterator.next()
val cachedError = entry.getValue
if (currentTime - cachedError.timestamp > ttlMs) {
iterator.remove()
debug(s"Removed expired topic creation error cache entry for
${entry.getKey}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]