[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode
hachikuji commented on a change in pull request #10343: URL: https://github.com/apache/kafka/pull/10343#discussion_r610263157 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) { return ControllerResult.of(records, null); } +ControllerResult> +createPartitions(List topics) { +List records = new ArrayList<>(); +List results = new ArrayList<>(); +for (CreatePartitionsTopic topic : topics) { +ApiError apiError = ApiError.NONE; +try { +createPartitions(topic, records); +} catch (ApiException e) { +apiError = ApiError.fromThrowable(e); +} catch (Exception e) { +log.error("Unexpected createPartitions error for {}", topic, e); +apiError = ApiError.fromThrowable(e); +} +results.add(new CreatePartitionsTopicResult(). +setName(topic.name()). +setErrorCode(apiError.error().code()). +setErrorMessage(apiError.message())); +} +return new ControllerResult<>(records, results, true); +} + +void createPartitions(CreatePartitionsTopic topic, + List records) { +Uuid topicId = topicsByName.get(topic.name()); +if (topicId == null) { +throw new UnknownTopicOrPartitionException(); +} +TopicControlInfo topicInfo = topics.get(topicId); +if (topicInfo == null) { +throw new UnknownTopicOrPartitionException(); +} +if (topic.count() == topicInfo.parts.size()) { Review comment: I guess this logic is consistent with the current implementation. It might have been nice to make this an idempotent operation. ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -538,4 +542,65 @@ class ControllerApis(val requestChannel: RequestChannel, } }) } + + def handleCreatePartitions(request: RequestChannel.Request): Unit = { +val future = createPartitions(request.body[CreatePartitionsRequest].data, + authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n)) +future.whenComplete((responses, exception) => { + if (exception != null) { +requestHelper.handleError(request, exception) + } else { +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + val responseData = new CreatePartitionsResponseData(). +setResults(responses). +setThrottleTimeMs(requestThrottleMs) + new CreatePartitionsResponse(responseData) +}) + } +}) + } + + def createPartitions(request: CreatePartitionsRequestData, + hasClusterAuth: Boolean, + getCreatableTopics: Iterable[String] => Set[String]) + : CompletableFuture[util.List[CreatePartitionsTopicResult]] = { +val responses = new util.ArrayList[CreatePartitionsTopicResult]() +val duplicateTopicNames = new util.HashSet[String]() +val topicNames = new util.HashSet[String]() +request.topics().forEach { + topic => +if (!topicNames.add(topic.name())) { + duplicateTopicNames.add(topic.name()) +} +} +duplicateTopicNames.forEach { topicName => + responses.add(new CreatePartitionsTopicResult(). +setName(topicName). +setErrorCode(INVALID_REQUEST.code()). +setErrorMessage("Duplicate topic name.")) +topicNames.remove(topicName) +} +val authorizedTopicNames = { + if (hasClusterAuth) { +topicNames.asScala + } else { +getCreatableTopics(topicNames.asScala) + } +} +val topics = new util.ArrayList[CreatePartitionsTopic] +topicNames.forEach { Review comment: nit: `topicNames.forEach { topicName =>` ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) { return ControllerResult.of(records, null); } +ControllerResult> +createPartitions(List topics) { +List records = new ArrayList<>(); +List results = new ArrayList<>(); +for (CreatePartitionsTopic topic : topics) { +ApiError apiError = ApiError.NONE; +try { +createPartitions(topic, records); +} catch (ApiException e) { +apiError = ApiError.fromThrowable(e); +} catch (Exception e) { +log.error("Unexpected createPartitions error for {}", topic, e); +apiError = ApiError.fromThrowable(e
[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode
hachikuji commented on a change in pull request #10343: URL: https://github.com/apache/kafka/pull/10343#discussion_r609081390 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1007,6 +1014,112 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) { return ControllerResult.of(records, null); } +ControllerResult> +createPartitions(List topics) { +List records = new ArrayList<>(); +List results = new ArrayList<>(); +for (CreatePartitionsTopic topic : topics) { +ApiError apiError = ApiError.NONE; +try { +createPartitions(topic, records); +} catch (ApiException e) { +apiError = ApiError.fromThrowable(e); +} catch (Exception e) { +log.error("Unexpected createPartitions error for {}", topic, e); +apiError = ApiError.fromThrowable(e); +} +results.add(new CreatePartitionsTopicResult(). +setName(topic.name()). +setErrorCode(apiError.error().code()). +setErrorMessage(apiError.message())); +} +return new ControllerResult<>(records, results, true); +} + +void createPartitions(CreatePartitionsTopic topic, + List records) { +Uuid topicId = topicsByName.get(topic.name()); +if (topicId == null) { +throw new UnknownTopicOrPartitionException(); +} +TopicControlInfo topicInfo = topics.get(topicId); +if (topicInfo == null) { +throw new UnknownTopicOrPartitionException(); +} +if (topic.count() == topicInfo.parts.size()) { +throw new InvalidPartitionsException("Topic already has " + +topicInfo.parts.size() + " partition(s)."); +} else if (topic.count() < topicInfo.parts.size()) { +throw new InvalidPartitionsException("The topic " + topic.name() + " currently " + +"has " + topicInfo.parts.size() + " partition(s); " + topic.count() + +" would not be an increase."); +} +int additional = topic.count() - topicInfo.parts.size(); +if (topic.assignments() != null) { +if (topic.assignments().size() != additional) { +throw new InvalidReplicaAssignmentException("Attempted to add " + additional + +" additional partition(s), but only " + topic.assignments().size() + +" assignment(s) were specified."); +} +} +Iterator iterator = topicInfo.parts.values().iterator(); +if (!iterator.hasNext()) { +throw new UnknownServerException("Invalid state: topic " + topic.name() + +" appears to have no partitions."); +} +PartitionControlInfo partitionInfo = iterator.next(); +if (partitionInfo.replicas.length > Short.MAX_VALUE) { +throw new UnknownServerException("Invalid replication factor " + +partitionInfo.replicas.length + ": expected a number less than 65536."); +} +short replicationFactor = (short) partitionInfo.replicas.length; +int startPartitionId = topicInfo.parts.size(); + +List> placements = null; +if (topic.assignments() != null) { +placements = new ArrayList<>(); +for (CreatePartitionsAssignment assignment : topic.assignments()) { Review comment: nit: it would improve readability to factor out some functions for some of the work here. Here we can have a separate function with a nice name for building the assignments ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -538,4 +542,57 @@ class ControllerApis(val requestChannel: RequestChannel, } }) } + + def handleCreatePartitions(request: RequestChannel.Request): Unit = { +val responses = createPartitions(request.body[CreatePartitionsRequest].data, + authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new CreatePartitionsResponseData(). +setResults(responses). +setThrottleTimeMs(throttleTimeMs) + new CreatePartitionsResponse(responseData) +}) + } + + def createPartitions(request: CreatePartitionsRequestData, + hasClusterAuth: Boolean, + getCreatableTopics: Iterable[String] => Set[String]): util.List[CreatePartitionsTopicResult] = { +val responses = new util.ArrayList[CreatePartitionsTopicResult]() +val duplicateTopicNames = new util.HashSet[String]() +val topicNames = new util.HashSet[String]() +request.topics().forEach { +