rondagostino commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1088113461
##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel,
val describableTopicNames =
getDescribableTopics.apply(allowedTopicNames).asJava
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
+ var totalRequestedPartitionCount = 0
+ val defaultPartitionCont = config.numPartitions.intValue()
while (iterator.hasNext) {
val creatableTopic = iterator.next()
if (duplicateTopicNames.contains(creatableTopic.name()) ||
!authorizedTopicNames.contains(creatableTopic.name())) {
iterator.remove()
- }
- }
- controller.createTopics(context, effectiveRequest,
describableTopicNames).thenApply { response =>
+ } else {
+ if (!creatableTopic.assignments().isEmpty) {
+ totalRequestedPartitionCount += creatableTopic.assignments().size()
+ } else if (creatableTopic.numPartitions() > 0)
+ totalRequestedPartitionCount += creatableTopic.numPartitions()
+ else
+ totalRequestedPartitionCount += defaultPartitionCont
+ }
+ }
+ val future = try {
+ if (!effectiveRequest.validateOnly())
+ controllerMutationQuota.record(totalRequestedPartitionCount)
+ controller.createTopics(context, effectiveRequest, describableTopicNames)
+ } catch {
+ case e: ThrottlingQuotaExceededException =>
+ val apiError = ApiError.fromThrowable(e)
+ val data = new CreateTopicsResponseData
+ effectiveRequest.topics().forEach(topic =>
+ data.topics.add(new
CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message)))
+ data.setThrottleTimeMs(e.throttleTimeMs())
+ CompletableFuture.completedFuture(data)
+ }
Review Comment:
I actually believe the code is correct as written. We mark all of the
requested topics in `effectiveRequest` with the throttle exception, and then we
fallthrough to mark all of the topics that were not in the effective request
with the appropriate error codes (duplicate topics get marked with invalid
request and unauthorized topics get marked with unauthorized request). Clearly
this is not clear though! I'll add a comment describing this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]