This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f260d79e7f MINOR: auto-topic creation back-off (#20663)
2f260d79e7f is described below

commit 2f260d79e7f5f8578048922283dd5961a23f045a
Author: Jinhe Zhang <[email protected]>
AuthorDate: Fri Oct 10 11:56:35 2025 -0400

    MINOR: auto-topic creation back-off (#20663)
    
    The auto-topic creation in` AutoTopicCreationManager` currently retries
    creating internal topics with every heartbeat.
    
    A simple back-off mechanism was implemented: if there is error in the
    errorcache and it's not expired or it's already in the inflight topics,
    then not send the topic creation request.
    
    Unit tests are added as well.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/server/AutoTopicCreationManager.scala    |  23 ++-
 .../server/AutoTopicCreationManagerTest.scala      | 222 +++++++++++++++++++++
 2 files changed, 243 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 1398a8ad7c1..5ea254fbe7a 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -104,6 +104,11 @@ private[server] class ExpiringErrorCache(maxSize: Int, 
time: Time) {
     }
   }
 
+  def hasError(topicName: String, currentTimeMs: Long): Boolean = {
+    val entry = byTopic.get(topicName)
+    entry != null && entry.expirationTimeMs > currentTimeMs
+  }
+
   def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): 
Map[String, String] = {
     val result = mutable.Map.empty[String, String]
     topicNames.foreach { topicName =>
@@ -173,8 +178,22 @@ class DefaultAutoTopicCreationManager(
     requestContext: RequestContext,
     timeoutMs: Long
   ): Unit = {
-    if (topics.nonEmpty) {
-      sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext), 
timeoutMs)
+    if (topics.isEmpty) {
+      return
+    }
+
+    val currentTimeMs = time.milliseconds()
+
+    // Filter out topics that are:
+    // 1. Already in error cache (back-off period)
+    // 2. Already in-flight (concurrent request)
+    val topicsToCreate = topics.filter { case (topicName, _) =>
+      !topicCreationErrorCache.hasError(topicName, currentTimeMs) &&
+      inflightTopics.add(topicName)
+    }
+
+    if (topicsToCreate.nonEmpty) {
+      sendCreateTopicRequestWithErrorCaching(topicsToCreate, 
Some(requestContext), timeoutMs)
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index b226b58c816..eee53de824f 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -51,6 +51,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 import org.mockito.Mockito.never
 
 import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
 
 class AutoTopicCreationManagerTest {
 
@@ -903,4 +904,225 @@ class AutoTopicCreationManagerTest {
     assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should 
have been evicted")
     assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should 
have been evicted")
   }
+
+  @Test
+  def testTopicsInBackoffAreNotRetried(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime,
+      topicErrorCacheCapacity = testCacheCapacity)
+
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val timeoutMs = 5000L
+
+    // First attempt - trigger topic creation
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate error response to cache the error
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    argumentCaptor.getValue.onComplete(clientResponse)
+
+    // Verify error is cached
+    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors.size)
+
+    // Second attempt - should NOT send request because topic is in back-off
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
+
+    // Verify still only one request was sent (not retried during back-off)
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+  }
+
+  @Test
+  def testTopicsOutOfBackoffCanBeRetried(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime,
+      topicErrorCacheCapacity = testCacheCapacity)
+
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val shortTtlMs = 1000L
+
+    // First attempt - trigger topic creation
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor.capture())
+
+    // Simulate error response to cache the error
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("test-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    argumentCaptor.getValue.onComplete(clientResponse)
+
+    // Verify error is cached
+    val cachedErrors1 = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertEquals(1, cachedErrors1.size)
+
+    // Advance time beyond TTL to exit back-off period
+    mockTime.sleep(shortTtlMs + 100)
+
+    // Verify error is expired
+    val cachedErrors2 = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
+    assertTrue(cachedErrors2.isEmpty, "Error should be expired after TTL")
+
+    // Second attempt - should send request because topic is out of back-off
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
+
+    // Verify a second request was sent (retry allowed after back-off expires)
+    Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+  }
+
+  @Test
+  def testInflightTopicsAreNotRetriedConcurrently(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime,
+      topicErrorCacheCapacity = testCacheCapacity)
+
+    val topics = Map(
+      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val timeoutMs = 5000L
+
+    // First call - should send request and mark topic as in-flight
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
+
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+
+    // Second concurrent call - should NOT send request because topic is 
in-flight
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
+
+    // Verify still only one request was sent (concurrent request blocked)
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+  }
+
+  @Test
+  def testBackoffAndInflightInteraction(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      brokerToController,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime,
+      topicErrorCacheCapacity = testCacheCapacity)
+
+    val topics = Map(
+      "backoff-topic" -> new 
CreatableTopic().setName("backoff-topic").setNumPartitions(1).setReplicationFactor(1),
+      "inflight-topic" -> new 
CreatableTopic().setName("inflight-topic").setNumPartitions(1).setReplicationFactor(1),
+      "normal-topic" -> new 
CreatableTopic().setName("normal-topic").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrincipal()
+    val timeoutMs = 5000L
+
+    // Create error for backoff-topic
+    val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
+    autoTopicCreationManager.createStreamsInternalTopics(backoffOnly, 
requestContext, timeoutMs)
+
+    val argumentCaptor1 = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      argumentCaptor1.capture())
+
+    // Simulate error response for backoff-topic
+    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      .setName("backoff-topic")
+      .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+      .setErrorMessage("Invalid replication factor")
+    createTopicsResponseData.topics().add(topicResult)
+
+    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
+    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+    val clientResponse = new ClientResponse(header, null, null,
+      0, 0, false, null, null, createTopicsResponse)
+
+    argumentCaptor1.getValue.onComplete(clientResponse)
+
+    // Make inflight-topic in-flight (without completing the request)
+    val inflightOnly = Map("inflight-topic" -> topics("inflight-topic"))
+    autoTopicCreationManager.createStreamsInternalTopics(inflightOnly, 
requestContext, timeoutMs)
+
+    Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+
+    // Now attempt to create all three topics together
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
+
+    val argumentCaptor2 = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+    // Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for 
normal-topic only
+    Mockito.verify(brokerToController, Mockito.times(3)).sendRequest(
+      argumentCaptor2.capture(),
+      any(classOf[ControllerRequestCompletionHandler]))
+
+    // Verify that only normal-topic was included in the last request
+    val lastRequest = 
argumentCaptor2.getValue.asInstanceOf[EnvelopeRequest.Builder]
+      .build(ApiKeys.ENVELOPE.latestVersion())
+    val forwardedRequestBuffer = lastRequest.requestData().duplicate()
+    val requestHeader = RequestHeader.parse(forwardedRequestBuffer)
+    val parsedRequest = CreateTopicsRequest.parse(new 
org.apache.kafka.common.protocol.ByteBufferAccessor(forwardedRequestBuffer),
+      requestHeader.apiVersion())
+
+    val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
+    assertEquals(1, topicNames.size, "Only normal-topic should be created")
+    assertTrue(topicNames.contains("normal-topic"), "normal-topic should be in 
the request")
+    assertTrue(!topicNames.contains("backoff-topic"), "backoff-topic should be 
filtered (in back-off)")
+    assertTrue(!topicNames.contains("inflight-topic"), "inflight-topic should 
be filtered (in-flight)")
+  }
 }

Reply via email to