This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2f260d79e7f MINOR: auto-topic creation back-off (#20663)
2f260d79e7f is described below
commit 2f260d79e7f5f8578048922283dd5961a23f045a
Author: Jinhe Zhang <[email protected]>
AuthorDate: Fri Oct 10 11:56:35 2025 -0400
MINOR: auto-topic creation back-off (#20663)
The auto-topic creation in` AutoTopicCreationManager` currently retries
creating internal topics with every heartbeat.
A simple back-off mechanism was implemented: if there is error in the
errorcache and it's not expired or it's already in the inflight topics,
then not send the topic creation request.
Unit tests are added as well.
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/server/AutoTopicCreationManager.scala | 23 ++-
.../server/AutoTopicCreationManagerTest.scala | 222 +++++++++++++++++++++
2 files changed, 243 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 1398a8ad7c1..5ea254fbe7a 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -104,6 +104,11 @@ private[server] class ExpiringErrorCache(maxSize: Int,
time: Time) {
}
}
+ def hasError(topicName: String, currentTimeMs: Long): Boolean = {
+ val entry = byTopic.get(topicName)
+ entry != null && entry.expirationTimeMs > currentTimeMs
+ }
+
def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long):
Map[String, String] = {
val result = mutable.Map.empty[String, String]
topicNames.foreach { topicName =>
@@ -173,8 +178,22 @@ class DefaultAutoTopicCreationManager(
requestContext: RequestContext,
timeoutMs: Long
): Unit = {
- if (topics.nonEmpty) {
- sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext),
timeoutMs)
+ if (topics.isEmpty) {
+ return
+ }
+
+ val currentTimeMs = time.milliseconds()
+
+ // Filter out topics that are:
+ // 1. Already in error cache (back-off period)
+ // 2. Already in-flight (concurrent request)
+ val topicsToCreate = topics.filter { case (topicName, _) =>
+ !topicCreationErrorCache.hasError(topicName, currentTimeMs) &&
+ inflightTopics.add(topicName)
+ }
+
+ if (topicsToCreate.nonEmpty) {
+ sendCreateTopicRequestWithErrorCaching(topicsToCreate,
Some(requestContext), timeoutMs)
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index b226b58c816..eee53de824f 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -51,6 +51,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.never
import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
class AutoTopicCreationManagerTest {
@@ -903,4 +904,225 @@ class AutoTopicCreationManagerTest {
assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should
have been evicted")
assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should
have been evicted")
}
+
+ @Test
+ def testTopicsInBackoffAreNotRetried(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val timeoutMs = 5000L
+
+ // First attempt - trigger topic creation
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate error response to cache the error
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Verify error is cached
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+
+ // Second attempt - should NOT send request because topic is in back-off
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
+
+ // Verify still only one request was sent (not retried during back-off)
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+ }
+
+ @Test
+ def testTopicsOutOfBackoffCanBeRetried(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val shortTtlMs = 1000L
+
+ // First attempt - trigger topic creation
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate error response to cache the error
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Verify error is cached
+ val cachedErrors1 =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors1.size)
+
+ // Advance time beyond TTL to exit back-off period
+ mockTime.sleep(shortTtlMs + 100)
+
+ // Verify error is expired
+ val cachedErrors2 =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertTrue(cachedErrors2.isEmpty, "Error should be expired after TTL")
+
+ // Second attempt - should send request because topic is out of back-off
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
+
+ // Verify a second request was sent (retry allowed after back-off expires)
+ Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+ }
+
+ @Test
+ def testInflightTopicsAreNotRetriedConcurrently(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val timeoutMs = 5000L
+
+ // First call - should send request and mark topic as in-flight
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
+
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+
+ // Second concurrent call - should NOT send request because topic is
in-flight
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
+
+ // Verify still only one request was sent (concurrent request blocked)
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+ }
+
+ @Test
+ def testBackoffAndInflightInteraction(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "backoff-topic" -> new
CreatableTopic().setName("backoff-topic").setNumPartitions(1).setReplicationFactor(1),
+ "inflight-topic" -> new
CreatableTopic().setName("inflight-topic").setNumPartitions(1).setReplicationFactor(1),
+ "normal-topic" -> new
CreatableTopic().setName("normal-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val timeoutMs = 5000L
+
+ // Create error for backoff-topic
+ val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
+ autoTopicCreationManager.createStreamsInternalTopics(backoffOnly,
requestContext, timeoutMs)
+
+ val argumentCaptor1 =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor1.capture())
+
+ // Simulate error response for backoff-topic
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("backoff-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ argumentCaptor1.getValue.onComplete(clientResponse)
+
+ // Make inflight-topic in-flight (without completing the request)
+ val inflightOnly = Map("inflight-topic" -> topics("inflight-topic"))
+ autoTopicCreationManager.createStreamsInternalTopics(inflightOnly,
requestContext, timeoutMs)
+
+ Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ any(classOf[ControllerRequestCompletionHandler]))
+
+ // Now attempt to create all three topics together
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
+
+ val argumentCaptor2 =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+ // Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for
normal-topic only
+ Mockito.verify(brokerToController, Mockito.times(3)).sendRequest(
+ argumentCaptor2.capture(),
+ any(classOf[ControllerRequestCompletionHandler]))
+
+ // Verify that only normal-topic was included in the last request
+ val lastRequest =
argumentCaptor2.getValue.asInstanceOf[EnvelopeRequest.Builder]
+ .build(ApiKeys.ENVELOPE.latestVersion())
+ val forwardedRequestBuffer = lastRequest.requestData().duplicate()
+ val requestHeader = RequestHeader.parse(forwardedRequestBuffer)
+ val parsedRequest = CreateTopicsRequest.parse(new
org.apache.kafka.common.protocol.ByteBufferAccessor(forwardedRequestBuffer),
+ requestHeader.apiVersion())
+
+ val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
+ assertEquals(1, topicNames.size, "Only normal-topic should be created")
+ assertTrue(topicNames.contains("normal-topic"), "normal-topic should be in
the request")
+ assertTrue(!topicNames.contains("backoff-topic"), "backoff-topic should be
filtered (in back-off)")
+ assertTrue(!topicNames.contains("inflight-topic"), "inflight-topic should
be filtered (in-flight)")
+ }
}