hachikuji commented on a change in pull request #10142:
URL: https://github.com/apache/kafka/pull/10142#discussion_r607224516
##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)
- channelManager.get.sendRequest(createTopicsRequest, new
ControllerRequestCompletionHandler {
+ val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
clearInflightRequests(creatableTopics)
}
override def onComplete(response: ClientResponse): Unit = {
- debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
+ debug(s"Auto topic creation completed for ${creatableTopics.keys} with
response ${response.responseBody.toString}.")
clearInflightRequests(creatableTopics)
}
- })
+ }
+
+ val channelManager = this.channelManager.getOrElse {
+ throw new IllegalStateException("Channel manager must be defined in
order to send CreateTopic requests.")
+ }
+
+ val request = metadataRequestContext.map { context =>
+ val requestVersion =
+ channelManager.controllerApiVersions() match {
+ case None =>
+ // We will rely on the Metadata request to be retried in the case
+ // that the latest version is not usable by the controller.
+ ApiKeys.CREATE_TOPICS.latestVersion()
+ case Some(nodeApiVersions) =>
+ nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
+ }
+
+ // Borrow client information such as client id and correlation id from
the original request,
+ // in order to correlate the create request with the original metadata
request.
+ val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+ requestVersion,
+ context.clientId,
+ context.correlationId)
+ ForwardingManager.buildEnvelopeRequest(context,
+
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
Review comment:
@dengziming It's not a bad idea. We could even simplify it a little
since the api key and version can be obtained from the request. I tend to agree
that this is kind of a niche usage though, so I'm not sure it calls for the
generality. Perhaps you could submit a follow-up once this is merged and we can
see what it looks like.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]