This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8ba41a2d0df MINOR: Expose internal topic creation errors to the user
(#20325)
8ba41a2d0df is described below
commit 8ba41a2d0dfac115422ff8eb485a7ce00dd7ff84
Author: Jinhe Zhang <[email protected]>
AuthorDate: Tue Sep 16 14:52:39 2025 -0400
MINOR: Expose internal topic creation errors to the user (#20325)
This PR introduces an ExpiringErrorCache that temporarily stores topic
creation errors, allowing the system to provide detailed failure reasons
in subsequent heartbeat responses.
Key Designs:
Time-based expiration: Errors are cached with a TTL based on the
streams group heartbeat interval (2x heartbeat interval). This ensures
errors remain available for at least one retry cycle while preventing
unbounded growth. 2. Priority queue for efficient expiry: Uses a
min-heap to track entries by expiration time, enabling efficient cleanup
of expired entries during cache operations. 3. Capacity enforcement:
Limits cache size to prevent memory issues under high error rates. When
capacity is exceeded, oldest entries are evicted first. 4. Reference
equality checks: Uses eq for object identity comparison when cleaning up
stale entries, avoiding expensive value comparisons while correctly
handling entry updates.
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/server/AutoTopicCreationManager.scala | 190 +++++++++-
.../src/main/scala/kafka/server/BrokerServer.scala | 5 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 29 +-
.../server/AutoTopicCreationManagerTest.scala | 246 ++++++++++++-
.../unit/kafka/server/ExpiringErrorCacheTest.scala | 400 +++++++++++++++++++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 62 +++-
6 files changed, 913 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index d23fc7db88a..5da400e29de 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Properties}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
@@ -35,6 +36,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.quota.ControllerMutationQuota
+import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -50,21 +52,96 @@ trait AutoTopicCreationManager {
def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
- requestContext: RequestContext
+ requestContext: RequestContext,
+ timeoutMs: Long
): Unit
+ def getStreamsInternalTopicCreationErrors(
+ topicNames: Set[String],
+ currentTimeMs: Long
+ ): Map[String, String]
+
+ def close(): Unit = {}
+
}
+/**
+ * Thread-safe cache that stores topic creation errors with per-entry
expiration.
+ * - Expiration: maintained by a min-heap (priority queue) on expiration time
+ * - Capacity: enforced by evicting entries with earliest expiration time (not
LRU)
+ * - Updates: old entries remain in queue but are ignored via reference
equality check
+ */
+private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
+
+ private case class Entry(topicName: String, errorMessage: String,
expirationTimeMs: Long)
+
+ private val byTopic = new ConcurrentHashMap[String, Entry]()
+ private val expiryQueue = new java.util.PriorityQueue[Entry](11, new
java.util.Comparator[Entry] {
+ override def compare(a: Entry, b: Entry): Int =
java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
+ })
+ private val lock = new ReentrantLock()
+
+ def put(topicName: String, errorMessage: String, ttlMs: Long): Unit = {
+ lock.lock()
+ try {
+ val currentTimeMs = time.milliseconds()
+ val expirationTimeMs = currentTimeMs + ttlMs
+ val entry = Entry(topicName, errorMessage, expirationTimeMs)
+ byTopic.put(topicName, entry)
+ expiryQueue.add(entry)
+
+ // Clean up expired entries and enforce capacity
+ while (!expiryQueue.isEmpty &&
+ (expiryQueue.peek().expirationTimeMs <= currentTimeMs ||
byTopic.size() > maxSize)) {
+ val evicted = expiryQueue.poll()
+ val current = byTopic.get(evicted.topicName)
+ if (current != null && (current eq evicted)) {
+ byTopic.remove(evicted.topicName)
+ }
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long):
Map[String, String] = {
+ val result = mutable.Map.empty[String, String]
+ topicNames.foreach { topicName =>
+ val entry = byTopic.get(topicName)
+ if (entry != null && entry.expirationTimeMs > currentTimeMs) {
+ result.put(topicName, entry.errorMessage)
+ }
+ }
+ result.toMap
+ }
+
+ private[server] def clear(): Unit = {
+ lock.lock()
+ try {
+ byTopic.clear()
+ expiryQueue.clear()
+ } finally {
+ lock.unlock()
+ }
+ }
+}
+
+
class DefaultAutoTopicCreationManager(
config: KafkaConfig,
channelManager: NodeToControllerChannelManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
- shareCoordinator: ShareCoordinator
+ shareCoordinator: ShareCoordinator,
+ time: Time,
+ topicErrorCacheCapacity: Int = 1000
) extends AutoTopicCreationManager with Logging {
private val inflightTopics = Collections.newSetFromMap(new
ConcurrentHashMap[String, java.lang.Boolean]())
+ // Hardcoded default capacity; can be overridden in tests via constructor
param
+ private val topicCreationErrorCache = new
ExpiringErrorCache(topicErrorCacheCapacity, time)
+
/**
* Initiate auto topic creation for the given topics.
*
@@ -93,13 +170,21 @@ class DefaultAutoTopicCreationManager(
override def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
- requestContext: RequestContext
+ requestContext: RequestContext,
+ timeoutMs: Long
): Unit = {
if (topics.nonEmpty) {
- sendCreateTopicRequest(topics, Some(requestContext))
+ sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext),
timeoutMs)
}
}
+ override def getStreamsInternalTopicCreationErrors(
+ topicNames: Set[String],
+ currentTimeMs: Long
+ ): Map[String, String] = {
+ topicCreationErrorCache.getErrorsForTopics(topicNames, currentTimeMs)
+ }
+
private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
requestContext: Option[RequestContext]
@@ -264,4 +349,101 @@ class DefaultAutoTopicCreationManager(
(creatableTopics, uncreatableTopics)
}
+
+ private def sendCreateTopicRequestWithErrorCaching(
+ creatableTopics: Map[String, CreatableTopic],
+ requestContext: Option[RequestContext],
+ timeoutMs: Long
+ ): Seq[MetadataResponseTopic] = {
+ val topicsToCreate = new
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
+ topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
+
+ val createTopicsRequest = new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTimeoutMs(config.requestTimeoutMs)
+ .setTopics(topicsToCreate)
+ )
+
+ val requestCompletionHandler = new ControllerRequestCompletionHandler {
+ override def onTimeout(): Unit = {
+ clearInflightRequests(creatableTopics)
+ debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
+ cacheTopicCreationErrors(creatableTopics.keys.toSet, "Auto topic
creation timed out.", timeoutMs)
+ }
+
+ override def onComplete(response: ClientResponse): Unit = {
+ clearInflightRequests(creatableTopics)
+ if (response.authenticationException() != null) {
+ val authException = response.authenticationException()
+ warn(s"Auto topic creation failed for ${creatableTopics.keys} with
authentication exception: ${authException.getMessage}")
+ cacheTopicCreationErrors(creatableTopics.keys.toSet,
authException.getMessage, timeoutMs)
+ } else if (response.versionMismatch() != null) {
+ val versionException = response.versionMismatch()
+ warn(s"Auto topic creation failed for ${creatableTopics.keys} with
version mismatch exception: ${versionException.getMessage}")
+ cacheTopicCreationErrors(creatableTopics.keys.toSet,
versionException.getMessage, timeoutMs)
+ } else {
+ response.responseBody() match {
+ case createTopicsResponse: CreateTopicsResponse =>
+ cacheTopicCreationErrorsFromResponse(createTopicsResponse,
timeoutMs)
+ case _ =>
+ debug(s"Auto topic creation completed for
${creatableTopics.keys} with response ${response.responseBody}.")
+ }
+ }
+ }
+ }
+
+ val request = requestContext.map { context =>
+ val requestVersion =
+ channelManager.controllerApiVersions.toScala match {
+ case None =>
+ // We will rely on the Metadata request to be retried in the case
+ // that the latest version is not usable by the controller.
+ ApiKeys.CREATE_TOPICS.latestVersion()
+ case Some(nodeApiVersions) =>
+ nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+ }
+
+ // Borrow client information such as client id and correlation id from
the original request,
+ // in order to correlate the create request with the original metadata
request.
+ val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+ requestVersion,
+ context.clientId,
+ context.correlationId)
+ ForwardingManager.buildEnvelopeRequest(context,
+
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
+ }.getOrElse(createTopicsRequest)
+
+ channelManager.sendRequest(request, requestCompletionHandler)
+
+ val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
+ new MetadataResponseTopic()
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ .setName(topic)
+ .setIsInternal(Topic.isInternal(topic))
+ }
+
+ creatableTopicResponses
+ }
+
+ private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage:
String, ttlMs: Long): Unit = {
+ topicNames.foreach { topicName =>
+ topicCreationErrorCache.put(topicName, errorMessage, ttlMs)
+ }
+ }
+
+ private def cacheTopicCreationErrorsFromResponse(response:
CreateTopicsResponse, ttlMs: Long): Unit = {
+ response.data().topics().forEach { topicResult =>
+ if (topicResult.errorCode() != Errors.NONE.code()) {
+ val errorMessage = Option(topicResult.errorMessage())
+ .filter(_.nonEmpty)
+ .getOrElse(Errors.forCode(topicResult.errorCode()).message())
+ topicCreationErrorCache.put(topicResult.name(), errorMessage, ttlMs)
+ debug(s"Cached topic creation error for ${topicResult.name()}:
$errorMessage")
+ }
+ }
+ }
+
+ override def close(): Unit = {
+ topicCreationErrorCache.clear()
+ }
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index dccd07b83c6..3ded033020b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -387,7 +387,7 @@ class BrokerServer(
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, clientToControllerChannelManager, groupCoordinator,
- transactionCoordinator, shareCoordinator)
+ transactionCoordinator, shareCoordinator, time)
dynamicConfigHandlers = Map[ConfigType, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config,
quotaManagers),
@@ -780,6 +780,9 @@ class BrokerServer(
if (shareCoordinator != null)
CoreUtils.swallow(shareCoordinator.shutdown(), this)
+ if (autoTopicCreationManager != null)
+ CoreUtils.swallow(autoTopicCreationManager.close(), this)
+
if (assignmentsManager != null)
CoreUtils.swallow(assignmentsManager.close(), this)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1276d6e72a9..d3935c8e507 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2812,10 +2812,35 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}
} else {
-
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate,
requestContext);
+ // Compute group-specific timeout for caching errors (2 *
heartbeat interval)
+ val heartbeatIntervalMs =
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
+ .map(_.streamsHeartbeatIntervalMs().toLong)
+
.getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong)
+ val timeoutMs = heartbeatIntervalMs * 2
+
+
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate,
requestContext, timeoutMs)
+
+ // Check for cached topic creation errors only if there's
already a MISSING_INTERNAL_TOPICS status
+ val hasMissingInternalTopicsStatus = responseData.status() !=
null &&
+ responseData.status().stream().anyMatch(s => s.statusCode() ==
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+
+ if (hasMissingInternalTopicsStatus) {
+ val currentTimeMs = time.milliseconds()
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicsToCreate.keys.toSet,
currentTimeMs)
+ if (cachedErrors.nonEmpty) {
+ val missingInternalTopicStatus =
+ responseData.status().stream().filter(x => x.statusCode()
==
StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
+ val creationErrorDetails = cachedErrors.map { case (topic,
error) => s"$topic ($error)" }.mkString(", ")
+ if (missingInternalTopicStatus.isPresent) {
+ val existingDetail =
Option(missingInternalTopicStatus.get().statusDetail()).getOrElse("")
+ missingInternalTopicStatus.get().setStatusDetail(
+ existingDetail + s"; Creation failed:
$creationErrorDetails."
+ )
+ }
+ }
+ }
}
}
-
requestHelper.sendMaybeThrottle(request, new
StreamsGroupHeartbeatResponse(responseData))
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 3065588a87a..cd17d7df2b3 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.protocol.{ApiKeys,
ByteBufferAccessor, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.server.util.MockTime
import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorConfig}
import org.apache.kafka.metadata.MetadataCache
@@ -45,14 +46,15 @@ import org.apache.kafka.server.quota.ControllerMutationQuota
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.never
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
+import org.mockito.Mockito.never
import scala.collection.{Map, Seq}
class AutoTopicCreationManagerTest {
private val requestTimeout = 100
+ private val testCacheCapacity = 3
private var config: KafkaConfig = _
private val metadataCache = Mockito.mock(classOf[MetadataCache])
private val brokerToController =
Mockito.mock(classOf[NodeToControllerChannelManager])
@@ -60,6 +62,7 @@ class AutoTopicCreationManagerTest {
private val transactionCoordinator =
Mockito.mock(classOf[TransactionCoordinator])
private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator])
private var autoTopicCreationManager: AutoTopicCreationManager = _
+ private val mockTime = new MockTime(0L, 0L)
private val internalTopicPartitions = 2
private val internalTopicReplicationFactor: Short = 2
@@ -76,6 +79,8 @@ class AutoTopicCreationManagerTest {
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
+ // Set a short group max session timeout for testing TTL (1 second)
+
props.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
"1000")
config = KafkaConfig.fromProps(props)
val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1,
"host1", 1))
@@ -115,7 +120,9 @@ class AutoTopicCreationManagerTest {
brokerToController,
groupCoordinator,
transactionCoordinator,
- shareCoordinator)
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic(topicName, numPartitions,
replicationFactor))
@@ -231,9 +238,11 @@ class AutoTopicCreationManagerTest {
brokerToController,
groupCoordinator,
transactionCoordinator,
- shareCoordinator)
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
@@ -267,9 +276,11 @@ class AutoTopicCreationManagerTest {
brokerToController,
groupCoordinator,
transactionCoordinator,
- shareCoordinator)
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
Mockito.verify(brokerToController, never()).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
@@ -288,9 +299,11 @@ class AutoTopicCreationManagerTest {
brokerToController,
groupCoordinator,
transactionCoordinator,
- shareCoordinator)
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
@@ -319,7 +332,9 @@ class AutoTopicCreationManagerTest {
brokerToController,
groupCoordinator,
transactionCoordinator,
- shareCoordinator)
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.CREATE_TOPICS.id)
@@ -356,4 +371,217 @@ class AutoTopicCreationManagerTest {
.setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor)
}
+
+ @Test
+ def testTopicCreationErrorCaching(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with errors
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic-1")
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage("Topic 'test-topic-1' already exists.")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Trigger the completion handler
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Verify that the error was cached
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("test-topic-1"))
+ assertEquals("Topic 'test-topic-1' already exists.",
cachedErrors("test-topic-1"))
+ }
+
+ @Test
+ def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+ val topics = Map(
+ "success-topic" -> new
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
+ "failed-topic" -> new
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate mixed response - one success, one failure
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("success-topic")
+ .setErrorCode(Errors.NONE.code())
+ )
+ createTopicsResponseData.topics().add(
+ new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("failed-topic")
+ .setErrorCode(Errors.POLICY_VIOLATION.code())
+ .setErrorMessage("Policy violation")
+ )
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Only the failed topic should be cached
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
"failed-topic", "nonexistent-topic"), mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertTrue(cachedErrors.contains("failed-topic"))
+ assertEquals("Policy violation", cachedErrors("failed-topic"))
+ }
+
+ @Test
+ def testErrorCacheTTL(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = testCacheCapacity)
+
+
+ // First cache an error by simulating topic creation failure
+ val topics = Map(
+ "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
+ )
+ val requestContext = initializeRequestContextWithUserPrincipal()
+ val shortTtlMs = 1000L // Use 1 second TTL for faster testing
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate a CreateTopicsResponse with error
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName("test-topic")
+ .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("Invalid replication factor")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ // Cache the error at T0
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Verify error is cached and accessible within TTL
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertEquals(1, cachedErrors.size)
+ assertEquals("Invalid replication factor", cachedErrors("test-topic"))
+
+ // Advance time beyond TTL
+ mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
+
+ // Verify error is now expired and proactively cleaned up
+ val expiredErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
+ assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively
cleaned up")
+ }
+
+ @Test
+ def testErrorCacheExpirationBasedEviction(): Unit = {
+ // Create manager with small cache size for testing
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ brokerToController,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicErrorCacheCapacity = 3)
+
+ val requestContext = initializeRequestContextWithUserPrincipal()
+
+ // Create 5 topics to exceed the cache size of 3
+ val topicNames = (1 to 5).map(i => s"test-topic-$i")
+
+ // Add errors for all 5 topics to the cache
+ topicNames.zipWithIndex.foreach { case (topicName, idx) =>
+ val topics = Map(
+ topicName -> new
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)
+ )
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
+
+ val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
+ Mockito.verify(brokerToController, Mockito.atLeastOnce()).sendRequest(
+ any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+ argumentCaptor.capture())
+
+ // Simulate error response for this topic
+ val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage(s"Topic '$topicName' already exists.")
+ createTopicsResponseData.topics().add(topicResult)
+
+ val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
+ val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
+ val clientResponse = new ClientResponse(header, null, null,
+ 0, 0, false, null, null, createTopicsResponse)
+
+ argumentCaptor.getValue.onComplete(clientResponse)
+
+ // Advance time slightly between additions to ensure different timestamps
+ mockTime.sleep(10)
+
+ }
+
+ // With cache size of 3, topics 1 and 2 should have been evicted
+ val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicNames.toSet,
mockTime.milliseconds())
+
+ // Only the last 3 topics should be in the cache (topics 3, 4, 5)
+ assertEquals(3, cachedErrors.size, "Cache should contain only the most
recent 3 entries")
+ assertTrue(cachedErrors.contains("test-topic-3"), "test-topic-3 should be
in cache")
+ assertTrue(cachedErrors.contains("test-topic-4"), "test-topic-4 should be
in cache")
+ assertTrue(cachedErrors.contains("test-topic-5"), "test-topic-5 should be
in cache")
+ assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should
have been evicted")
+ assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should
have been evicted")
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ExpiringErrorCacheTest.scala
b/core/src/test/scala/unit/kafka/server/ExpiringErrorCacheTest.scala
new file mode 100644
index 00000000000..be02f95374c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ExpiringErrorCacheTest.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.server.util.MockTime
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test}
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.Random
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class ExpiringErrorCacheTest {
+
+ private var mockTime: MockTime = _
+ private var cache: ExpiringErrorCache = _
+
+ @BeforeEach
+ def setUp(): Unit = {
+ mockTime = new MockTime()
+ }
+
+ // Basic Functionality Tests
+
+ @Test
+ def testPutAndGet(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 2000L)
+
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds())
+ assertEquals(2, errors.size)
+ assertEquals("error1", errors("topic1"))
+ assertEquals("error2", errors("topic2"))
+ }
+
+ @Test
+ def testGetNonExistentTopic(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds())
+ assertEquals(1, errors.size)
+ assertEquals("error1", errors("topic1"))
+ assertFalse(errors.contains("topic2"))
+ }
+
+ @Test
+ def testUpdateExistingEntry(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+ assertEquals("error1", cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds())("topic1"))
+
+ // Update with new error
+ cache.put("topic1", "error2", 2000L)
+ assertEquals("error2", cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds())("topic1"))
+ }
+
+ @Test
+ def testGetMultipleTopics(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 1000L)
+ cache.put("topic3", "error3", 1000L)
+
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic3", "topic4"),
mockTime.milliseconds())
+ assertEquals(2, errors.size)
+ assertEquals("error1", errors("topic1"))
+ assertEquals("error3", errors("topic3"))
+ assertFalse(errors.contains("topic2"))
+ assertFalse(errors.contains("topic4"))
+ }
+
+ // Expiration Tests
+
+ @Test
+ def testExpiredEntryNotReturned(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+
+ // Entry should be available before expiration
+ assertEquals(1, cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds()).size)
+
+ // Advance time past expiration
+ mockTime.sleep(1001L)
+
+ // Entry should not be returned after expiration
+ assertTrue(cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds()).isEmpty)
+ }
+
+ @Test
+ def testExpiredEntriesCleanedOnPut(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ // Add entries with different TTLs
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 2000L)
+
+ // Advance time to expire topic1 but not topic2
+ mockTime.sleep(1500L)
+
+ // Add a new entry - this should trigger cleanup
+ cache.put("topic3", "error3", 1000L)
+
+ // Verify only non-expired entries remain
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"),
mockTime.milliseconds())
+ assertEquals(2, errors.size)
+ assertFalse(errors.contains("topic1"))
+ assertEquals("error2", errors("topic2"))
+ assertEquals("error3", errors("topic3"))
+ }
+
+ @Test
+ def testMixedExpiredAndValidEntries(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 500L)
+ cache.put("topic2", "error2", 1000L)
+ cache.put("topic3", "error3", 1500L)
+
+ // Advance time to expire only topic1
+ mockTime.sleep(600L)
+
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"),
mockTime.milliseconds())
+ assertEquals(2, errors.size)
+ assertFalse(errors.contains("topic1"))
+ assertTrue(errors.contains("topic2"))
+ assertTrue(errors.contains("topic3"))
+ }
+
+ // Capacity Enforcement Tests
+
+ @Test
+ def testCapacityEnforcement(): Unit = {
+ cache = new ExpiringErrorCache(3, mockTime)
+
+ // Add 5 entries, exceeding capacity of 3
+ for (i <- 1 to 5) {
+ cache.put(s"topic$i", s"error$i", 1000L)
+ // Small time advance between entries to ensure different insertion order
+ mockTime.sleep(10L)
+ }
+
+ val errors = cache.getErrorsForTopics((1 to 5).map(i => s"topic$i").toSet,
mockTime.milliseconds())
+ assertEquals(3, errors.size)
+
+ // The cache evicts by earliest expiration time
+ // Since all have same TTL, earliest inserted (topic1, topic2) should be
evicted
+ assertFalse(errors.contains("topic1"))
+ assertFalse(errors.contains("topic2"))
+ assertTrue(errors.contains("topic3"))
+ assertTrue(errors.contains("topic4"))
+ assertTrue(errors.contains("topic5"))
+ }
+
+ @Test
+ def testEvictionOrder(): Unit = {
+ cache = new ExpiringErrorCache(3, mockTime)
+
+ // Add entries with different TTLs
+ cache.put("topic1", "error1", 3000L) // Expires at 3000
+ mockTime.sleep(100L)
+ cache.put("topic2", "error2", 1000L) // Expires at 1100
+ mockTime.sleep(100L)
+ cache.put("topic3", "error3", 2000L) // Expires at 2200
+ mockTime.sleep(100L)
+ cache.put("topic4", "error4", 500L) // Expires at 800
+
+ // With capacity 3, topic4 (earliest expiration) should be evicted
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3",
"topic4"), mockTime.milliseconds())
+ assertEquals(3, errors.size)
+ assertTrue(errors.contains("topic1"))
+ assertTrue(errors.contains("topic2"))
+ assertTrue(errors.contains("topic3"))
+ assertFalse(errors.contains("topic4"))
+ }
+
+ @Test
+ def testCapacityWithDifferentTTLs(): Unit = {
+ cache = new ExpiringErrorCache(2, mockTime)
+
+ cache.put("topic1", "error1", 5000L) // Long TTL
+ cache.put("topic2", "error2", 100L) // Short TTL
+ cache.put("topic3", "error3", 3000L) // Medium TTL
+
+ // topic2 has earliest expiration, so it should be evicted
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"),
mockTime.milliseconds())
+ assertEquals(2, errors.size)
+ assertTrue(errors.contains("topic1"))
+ assertFalse(errors.contains("topic2"))
+ assertTrue(errors.contains("topic3"))
+ }
+
+ // Update and Stale Entry Tests
+
+ @Test
+ def testUpdateDoesNotLeaveStaleEntries(): Unit = {
+ cache = new ExpiringErrorCache(3, mockTime)
+
+ // Fill cache to capacity
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 1000L)
+ cache.put("topic3", "error3", 1000L)
+
+ // Update topic2 with longer TTL
+ cache.put("topic2", "error2_updated", 5000L)
+
+ // Add new entry to trigger eviction
+ cache.put("topic4", "error4", 1000L)
+
+ // Should evict topic1 or topic3 (earliest expiration), not the updated
topic2
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3",
"topic4"), mockTime.milliseconds())
+ assertEquals(3, errors.size)
+ assertTrue(errors.contains("topic2"))
+ assertEquals("error2_updated", errors("topic2"))
+ }
+
+ @Test
+ def testStaleEntriesInQueueHandledCorrectly(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ // Add and update same topic multiple times
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic1", "error2", 2000L)
+ cache.put("topic1", "error3", 3000L)
+
+ // Only latest value should be returned
+ val errors = cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds())
+ assertEquals(1, errors.size)
+ assertEquals("error3", errors("topic1"))
+
+ // Advance time to expire first two entries
+ mockTime.sleep(2500L)
+
+ // Force cleanup by adding new entry
+ cache.put("topic2", "error_new", 1000L)
+
+ // topic1 should still be available with latest value
+ val errorsAfterCleanup = cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds())
+ assertEquals(1, errorsAfterCleanup.size)
+ assertEquals("error3", errorsAfterCleanup("topic1"))
+ }
+
+ // Edge Cases
+
+ @Test
+ def testEmptyCache(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds())
+ assertTrue(errors.isEmpty)
+ }
+
+ @Test
+ def testSingleEntryCache(): Unit = {
+ cache = new ExpiringErrorCache(1, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 1000L)
+
+ // Only most recent should remain
+ val errors = cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds())
+ assertEquals(1, errors.size)
+ assertFalse(errors.contains("topic1"))
+ assertTrue(errors.contains("topic2"))
+ }
+
+ @Test
+ def testZeroTTL(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 0L)
+
+ // Entry expires immediately
+ assertTrue(cache.getErrorsForTopics(Set("topic1"),
mockTime.milliseconds()).isEmpty)
+ }
+
+ @Test
+ def testClearOperation(): Unit = {
+ cache = new ExpiringErrorCache(10, mockTime)
+
+ cache.put("topic1", "error1", 1000L)
+ cache.put("topic2", "error2", 1000L)
+
+ assertEquals(2, cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds()).size)
+
+ cache.clear()
+
+ assertTrue(cache.getErrorsForTopics(Set("topic1", "topic2"),
mockTime.milliseconds()).isEmpty)
+ }
+
+ // Concurrent Access Tests
+
+ @Test
+ def testConcurrentPutOperations(): Unit = {
+ cache = new ExpiringErrorCache(100, mockTime)
+ val numThreads = 10
+ val numTopicsPerThread = 20
+ val latch = new CountDownLatch(numThreads)
+
+ (1 to numThreads).foreach { threadId =>
+ Future {
+ try {
+ for (i <- 1 to numTopicsPerThread) {
+ cache.put(s"topic_${threadId}_$i", s"error_${threadId}_$i", 1000L)
+ }
+ } finally {
+ latch.countDown()
+ }
+ }
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS))
+
+ // Verify all entries were added
+ val allTopics = (1 to numThreads).flatMap { threadId =>
+ (1 to numTopicsPerThread).map(i => s"topic_${threadId}_$i")
+ }.toSet
+
+ val errors = cache.getErrorsForTopics(allTopics, mockTime.milliseconds())
+ assertEquals(100, errors.size) // Limited by cache capacity
+ }
+
+ @Test
+ def testConcurrentPutAndGet(): Unit = {
+ cache = new ExpiringErrorCache(100, mockTime)
+ val numOperations = 1000
+ val random = new Random()
+ val topics = (1 to 50).map(i => s"topic$i").toArray
+
+ val futures = (1 to numOperations).map { _ =>
+ Future {
+ if (random.nextBoolean()) {
+ // Put operation
+ val topic = topics(random.nextInt(topics.length))
+ cache.put(topic, s"error_${random.nextInt()}", 1000L)
+ } else {
+ // Get operation
+ val topicsToGet = Set(topics(random.nextInt(topics.length)))
+ cache.getErrorsForTopics(topicsToGet, mockTime.milliseconds())
+ }
+ }
+ }
+
+ // Wait for all operations to complete
+ Future.sequence(futures).map(_ => ())
+ }
+
+ @Test
+ def testConcurrentUpdates(): Unit = {
+ cache = new ExpiringErrorCache(50, mockTime)
+ val numThreads = 10
+ val numUpdatesPerThread = 100
+ val sharedTopics = (1 to 10).map(i => s"shared_topic$i").toArray
+ val latch = new CountDownLatch(numThreads)
+
+ (1 to numThreads).foreach { threadId =>
+ Future {
+ try {
+ val random = new Random()
+ for (i <- 1 to numUpdatesPerThread) {
+ val topic = sharedTopics(random.nextInt(sharedTopics.length))
+ cache.put(topic, s"error_thread${threadId}_update$i", 1000L)
+ }
+ } finally {
+ latch.countDown()
+ }
+ }
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS))
+
+ // Verify all shared topics have some value
+ val errors = cache.getErrorsForTopics(sharedTopics.toSet,
mockTime.milliseconds())
+ sharedTopics.foreach { topic =>
+ assertTrue(errors.contains(topic), s"Topic $topic should have a value")
+ assertTrue(errors(topic).startsWith("error_thread"), s"Value should be
from one of the threads")
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 42cf9a30ff5..bdd62291407 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -168,7 +168,8 @@ class KafkaApisTest extends Logging {
authorizer: Option[Authorizer] = None,
configRepository: ConfigRepository = new MockConfigRepository(),
overrideProperties: Map[String, String] = Map.empty,
- featureVersions: Seq[FeatureVersion] = Seq.empty
+ featureVersions: Seq[FeatureVersion] = Seq.empty,
+ autoTopicCreationManager: Option[AutoTopicCreationManager] = None
): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId)
@@ -194,7 +195,7 @@ class KafkaApisTest extends Logging {
groupCoordinator = groupCoordinator,
txnCoordinator = txnCoordinator,
shareCoordinator = shareCoordinator,
- autoTopicCreationManager = autoTopicCreationManager,
+ autoTopicCreationManager =
autoTopicCreationManager.getOrElse(this.autoTopicCreationManager),
brokerId = brokerId,
config = config,
configRepository = configRepository,
@@ -10887,7 +10888,7 @@ class KafkaApisTest extends Logging {
future.complete(new
StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse,
missingTopics.asJava))
val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
-
verify(autoTopicCreationManager).createStreamsInternalTopics(missingTopics,
requestChannelRequest.context)
+ verify(autoTopicCreationManager).createStreamsInternalTopics(any(), any(),
anyLong())
}
@Test
@@ -10947,6 +10948,61 @@ class KafkaApisTest extends Logging {
)
}
+ @Test
+ def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(util.Map.of(StreamsVersion.FEATURE_NAME,
1.toShort))
+
+ metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
+
+ val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+
+ val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
+ when(groupCoordinator.streamsGroupHeartbeat(
+ requestChannelRequest.context,
+ streamsGroupHeartbeatRequest
+ )).thenReturn(future)
+
+ // Mock AutoTopicCreationManager to return cached errors
+ val mockAutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
+
when(mockAutoTopicCreationManager.getStreamsInternalTopicCreationErrors(ArgumentMatchers.eq(Set("test-topic")),
any()))
+ .thenReturn(Map("test-topic" -> "INVALID_REPLICATION_FACTOR"))
+ // Mock the createStreamsInternalTopics method to do nothing (simulate
topic creation attempt)
+
doNothing().when(mockAutoTopicCreationManager).createStreamsInternalTopics(any(),
any(), anyLong())
+
+ kafkaApis = createKafkaApis(autoTopicCreationManager =
Some(mockAutoTopicCreationManager))
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ // Group coordinator returns MISSING_INTERNAL_TOPICS status and topics to
create
+ val missingTopics = util.Map.of("test-topic", new CreatableTopic())
+ val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+ .setMemberId("member")
+ .setStatus(util.List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+ .setStatusDetail("Internal topics are missing: [test-topic]")
+ ))
+
+ future.complete(new
StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics))
+ val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+
+ assertEquals(Errors.NONE.code, response.data.errorCode())
+ assertEquals(null, response.data.errorMessage())
+
+ // Verify that the cached error was appended to the existing status detail
+ assertEquals(1, response.data.status().size())
+ val status = response.data.status().get(0)
+
assertEquals(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code(),
status.statusCode())
+ assertTrue(status.statusDetail().contains("Internal topics are missing:
[test-topic]"))
+ assertTrue(status.statusDetail().contains("Creation failed: test-topic
(INVALID_REPLICATION_FACTOR)"))
+
+ // Verify that createStreamsInternalTopics was called
+ verify(mockAutoTopicCreationManager).createStreamsInternalTopics(any(),
any(), anyLong())
+
verify(mockAutoTopicCreationManager).getStreamsInternalTopicCreationErrors(ArgumentMatchers.eq(Set("test-topic")),
any())
+ }
+
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {