[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

2021-04-08 Thread GitBox


hachikuji commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r610263157



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e);
+}
+results.add(new CreatePartitionsTopicResult().
+setName(topic.name()).
+setErrorCode(apiError.error().code()).
+setErrorMessage(apiError.message()));
+}
+return new ControllerResult<>(records, results, true);
+}
+
+void createPartitions(CreatePartitionsTopic topic,
+  List records) {
+Uuid topicId = topicsByName.get(topic.name());
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException();
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException();
+}
+if (topic.count() == topicInfo.parts.size()) {

Review comment:
   I guess this logic is consistent with the current implementation. It 
might have been nice to make this an idempotent operation.

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -538,4 +542,65 @@ class ControllerApis(val requestChannel: RequestChannel,
 }
   })
   }
+
+  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+val future = createPartitions(request.body[CreatePartitionsRequest].data,
+  authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(n => n))
+future.whenComplete((responses, exception) => {
+  if (exception != null) {
+requestHelper.handleError(request, exception)
+  } else {
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+  val responseData = new CreatePartitionsResponseData().
+setResults(responses).
+setThrottleTimeMs(requestThrottleMs)
+  new CreatePartitionsResponse(responseData)
+})
+  }
+})
+  }
+
+  def createPartitions(request: CreatePartitionsRequestData,
+   hasClusterAuth: Boolean,
+   getCreatableTopics: Iterable[String] => Set[String])
+   : 
CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
+val responses = new util.ArrayList[CreatePartitionsTopicResult]()
+val duplicateTopicNames = new util.HashSet[String]()
+val topicNames = new util.HashSet[String]()
+request.topics().forEach {
+  topic =>
+if (!topicNames.add(topic.name())) {
+  duplicateTopicNames.add(topic.name())
+}
+}
+duplicateTopicNames.forEach { topicName =>
+  responses.add(new CreatePartitionsTopicResult().
+setName(topicName).
+setErrorCode(INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic name."))
+topicNames.remove(topicName)
+}
+val authorizedTopicNames = {
+  if (hasClusterAuth) {
+topicNames.asScala
+  } else {
+getCreatableTopics(topicNames.asScala)
+  }
+}
+val topics = new util.ArrayList[CreatePartitionsTopic]
+topicNames.forEach {

Review comment:
   nit: `topicNames.forEach { topicName =>`
   

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e

[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

2021-04-07 Thread GitBox


hachikuji commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r609081390



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +1014,112 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e);
+}
+results.add(new CreatePartitionsTopicResult().
+setName(topic.name()).
+setErrorCode(apiError.error().code()).
+setErrorMessage(apiError.message()));
+}
+return new ControllerResult<>(records, results, true);
+}
+
+void createPartitions(CreatePartitionsTopic topic,
+  List records) {
+Uuid topicId = topicsByName.get(topic.name());
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException();
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException();
+}
+if (topic.count() == topicInfo.parts.size()) {
+throw new InvalidPartitionsException("Topic already has " +
+topicInfo.parts.size() + " partition(s).");
+} else if (topic.count() < topicInfo.parts.size()) {
+throw new InvalidPartitionsException("The topic " + topic.name() + 
" currently " +
+"has " + topicInfo.parts.size() + " partition(s); " + 
topic.count() +
+" would not be an increase.");
+}
+int additional = topic.count() - topicInfo.parts.size();
+if (topic.assignments() != null) {
+if (topic.assignments().size() != additional) {
+throw new InvalidReplicaAssignmentException("Attempted to add 
" + additional +
+" additional partition(s), but only " + 
topic.assignments().size() +
+" assignment(s) were specified.");
+}
+}
+Iterator iterator = 
topicInfo.parts.values().iterator();
+if (!iterator.hasNext()) {
+throw new UnknownServerException("Invalid state: topic " + 
topic.name() +
+" appears to have no partitions.");
+}
+PartitionControlInfo partitionInfo = iterator.next();
+if (partitionInfo.replicas.length > Short.MAX_VALUE) {
+throw new UnknownServerException("Invalid replication factor " +
+partitionInfo.replicas.length + ": expected a number less than 
65536.");
+}
+short replicationFactor = (short) partitionInfo.replicas.length;
+int startPartitionId = topicInfo.parts.size();
+
+List> placements = null;
+if (topic.assignments() != null) {
+placements = new ArrayList<>();
+for (CreatePartitionsAssignment assignment : topic.assignments()) {

Review comment:
   nit: it would improve readability to factor out some functions for some 
of the work here. Here we can have a separate function with a nice name for 
building the assignments

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -538,4 +542,57 @@ class ControllerApis(val requestChannel: RequestChannel,
 }
   })
   }
+
+  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+val responses = 
createPartitions(request.body[CreatePartitionsRequest].data,
+  authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new CreatePartitionsResponseData().
+setResults(responses).
+setThrottleTimeMs(throttleTimeMs)
+  new CreatePartitionsResponse(responseData)
+})
+  }
+
+  def createPartitions(request: CreatePartitionsRequestData,
+   hasClusterAuth: Boolean,
+   getCreatableTopics: Iterable[String] => Set[String]): 
util.List[CreatePartitionsTopicResult] = {
+val responses = new util.ArrayList[CreatePartitionsTopicResult]()
+val duplicateTopicNames = new util.HashSet[String]()
+val topicNames = new util.HashSet[String]()
+request.topics().forEach {
+