Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
cmccabe commented on PR #14863: URL: https://github.com/apache/kafka/pull/14863#issuecomment-1845996759 committed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
cmccabe closed pull request #14863: KAFKA-15426: Process and persist directory assignments URL: https://github.com/apache/kafka/pull/14863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
cmccabe commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1419522832 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { Review Comment: For future reference, we shouldn't check MV in ControllerApis, but in `ReplicationControlManager`. `handleDescribeCluster` does something different because it's consulting a cache -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); +if (topicControl == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); +topicError = Errors.UNKNOWN_TOPIC_ID; +} +AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); +for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { +int partitionIndex = reqPartition.partitionIndex(); +Errors partitionError = topicError; +if (topicError == Errors.NONE) { +String topicName = topicControl.name; +PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); +if (partitionRegistration == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex); +partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION; +} else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) { +log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex); +partitionError = Errors.NOT_LEADER_OR_FOLLOWER; +} else { +Optional partitionChangeRecord = new PartitionChangeBuilder( +partitionRegistration, +topicId, +partitionIndex, +new LeaderAcceptor(clusterControl, partitionRegistration), +featureControl.metadataVersion(), +getTopicEffectiveMinIsr(topicName) +) +.setDirectory(brokerId, dirId) +.setDefaultDirProvider(clusterDescriber) +.build(); +partitionChangeRecord.ifPresent(records::add); + +if (!brokerRegistration.hasOnlineDir(dirId)) { Review Comment: Good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
rondagostino commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447 ## core/src/test/scala/unit/kafka/server/ControllerApisTest.scala: ## @@ -1139,31 +1139,24 @@ class ControllerApisTest { } @Test - def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = { + def testAssignReplicasToDirs(): Unit = { val controller = mock(classOf[Controller]) -val controllerApis = createControllerApis(None, controller) +val authorizer = mock(classOf[Authorizer]) +val controllerApis = createControllerApis(Some(authorizer), controller) + +val request = new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()).build() + +when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Collections.singletonList(new Action( + AclOperation.CLUSTER_ACTION, + new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), + 1, true, true +) + .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED)) +when(controller.assignReplicasToDirs(any[ControllerRequestContext], ArgumentMatchers.eq(request.data))) + .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) Review Comment: Need to return something like `completableFuture.completeExceptionally()` here as opposed to throwing an exception directly. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); Review Comment: nit: `topicControlInfo` or `topicInfo` might be a better name than `topicControl` -- get the "Info" in there somehow. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); +if (topicControl == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); +topicError = Errors.UNKNOWN_TOPIC_ID; +} +AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); +for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { +int partitionIndex = reqPartition.partitionIndex(); +Errors partitionError = topicError; +if (topicError == Errors.NONE) { +String topicName = topicControl.name; +PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); +if
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1415836932 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { Review Comment: I've added a check in `ControllerApis.handleAssignReplicasToDirs` similar to `handleDescribeCluster`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
pprovenzano commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1414875644 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { Review Comment: We should check if the Metadata version supports this request and return an error if not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on PR #14863: URL: https://github.com/apache/kafka/pull/14863#issuecomment-1836233799 Ready for review. Draft because it depends on https://github.com/apache/kafka/pull/14838 so ignore the first commit. @cmccabe @rondagostino @pprovenzano -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on PR #14863: URL: https://github.com/apache/kafka/pull/14863#issuecomment-1831518299 Depends on #14820 and #14838 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez opened a new pull request, #14863: URL: https://github.com/apache/kafka/pull/14863 Handle AssignReplicasToDirs requests, persist metadata changes with new directory assignments and possible leader elections. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org