This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3067f15caf6 KAFKA-19596: Improve visibility when topic auto-creation
fails (#20340)
3067f15caf6 is described below
commit 3067f15caf624ef854369d9f7e0980fc5c210626
Author: Robert Young <[email protected]>
AuthorDate: Thu Aug 14 14:47:12 2025 +1200
KAFKA-19596: Improve visibility when topic auto-creation fails (#20340)
Log a warning for each topic that failed to be created as a result of an
automatic creation. This makes the underlying cause more visible so
users can take action.
Previously, at the default log level, you could only see logs that the
broker was attempting to autocreate topics. If the creation failed, then
it was logged at debug.
Signed-off-by: Robert Young <[email protected]>
Reviewers: Luke Chen <[email protected]>, Kuan-Po Tseng
<[email protected]>
---
.../scala/kafka/server/AutoTopicCreationManager.scala | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 25b27302d3a..7b98c8c16fb 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicConfig, CreatableTopicConfigCollection}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext,
RequestHeader}
+import org.apache.kafka.common.requests.{CreateTopicsRequest,
CreateTopicsResponse, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@@ -138,6 +138,19 @@ class DefaultAutoTopicCreationManager(
} else if (response.versionMismatch() != null) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with
invalid version exception")
} else {
+ if (response.hasResponse) {
+ response.responseBody() match {
+ case createTopicsResponse: CreateTopicsResponse =>
+ createTopicsResponse.data().topics().forEach(topicResult => {
+ val error = Errors.forCode(topicResult.errorCode)
+ if (error != Errors.NONE) {
+ warn(s"Auto topic creation failed for ${topicResult.name}
with error '${error.name}': ${topicResult.errorMessage}")
+ }
+ })
+ case other =>
+ warn(s"Auto topic creation request received unexpected
response type: ${other.getClass.getSimpleName}")
+ }
+ }
debug(s"Auto topic creation completed for ${creatableTopics.keys}
with response ${response.responseBody}.")
}
}