showuon commented on code in PR #20485:
URL: https://github.com/apache/kafka/pull/20485#discussion_r2663595954


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1215,7 +1215,23 @@ static void 
validateTotalNumberOfPartitions(CreateTopicsRequestData request, int
             } else {
                 totalPartitions += topic.assignments().size();
             }
+        }
 
+        if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
+            throw new PolicyViolationException("Excessively large number of 
partitions per request.");
+        }
+    }
+
+    static void validateTotalNumberOfPartitions(List<CreatePartitionsTopic> 
topics) {
+        int totalPartitions = 0;
+        for (CreatePartitionsTopic topic: topics) {
+            if (topic.assignments() == null || topic.assignments().isEmpty()) {
+                if (topic.count() > 0) {
+                    totalPartitions += topic.count();
+                }
+            } else {
+                totalPartitions += topic.assignments().size();
+            }

Review Comment:
   Is this right? If a topic already has 9999 partitions, and 1 partition to be 
added, the `topic.count()` will be set to 10000, which exceeds the limit. 
However, it only needs to create 1 partition record in this case. If we really 
want to limit the size for createPartition, I think we should count the number 
of `to be created` partitions, instead of relying on the topic count only. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to