Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-07 Thread via GitHub


cmccabe commented on PR #14863:
URL: https://github.com/apache/kafka/pull/14863#issuecomment-1845996759

   committed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-07 Thread via GitHub


cmccabe closed pull request #14863: KAFKA-15426: Process and persist directory 
assignments
URL: https://github.com/apache/kafka/pull/14863


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-07 Thread via GitHub


cmccabe commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1419522832


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {

Review Comment:
   For future reference, we shouldn't check MV in ControllerApis, but in 
`ReplicationControlManager`. `handleDescribeCluster` does something different 
because it's consulting a cache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);
+if (topicControl == null) {
+log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+topicError = Errors.UNKNOWN_TOPIC_ID;
+}
+AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+int partitionIndex = reqPartition.partitionIndex();
+Errors partitionError = topicError;
+if (topicError == Errors.NONE) {
+String topicName = topicControl.name;
+PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+if (partitionRegistration == null) {
+log.warn("AssignReplicasToDirsRequest from broker 
{} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
+partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+} else if 
(!Replicas.contains(partitionRegistration.replicas, brokerId)) {
+log.warn("AssignReplicasToDirsRequest from broker 
{} references non assigned partition {}-{}", brokerId, topicName, 
partitionIndex);
+partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
+} else {
+Optional 
partitionChangeRecord = new PartitionChangeBuilder(
+partitionRegistration,
+topicId,
+partitionIndex,
+new LeaderAcceptor(clusterControl, 
partitionRegistration),
+featureControl.metadataVersion(),
+getTopicEffectiveMinIsr(topicName)
+)
+.setDirectory(brokerId, dirId)
+.setDefaultDirProvider(clusterDescriber)
+.build();
+partitionChangeRecord.ifPresent(records::add);
+
+if (!brokerRegistration.hasOnlineDir(dirId)) {

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-05 Thread via GitHub


rondagostino commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447


##
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala:
##
@@ -1139,31 +1139,24 @@ class ControllerApisTest {
   }
 
   @Test
-  def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
+  def testAssignReplicasToDirs(): Unit = {
 val controller = mock(classOf[Controller])
-val controllerApis = createControllerApis(None, controller)
+val authorizer = mock(classOf[Authorizer])
+val controllerApis = createControllerApis(Some(authorizer), controller)
+
+val request = new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData()).build()
+
+when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(Collections.singletonList(new Action(
+  AclOperation.CLUSTER_ACTION,
+  new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+  1, true, true
+)
+  .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+when(controller.assignReplicasToDirs(any[ControllerRequestContext], 
ArgumentMatchers.eq(request.data)))
+  .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())

Review Comment:
   Need to return something like `completableFuture.completeExceptionally()` 
here as opposed to throwing an exception directly.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);

Review Comment:
   nit: `topicControlInfo` or `topicInfo` might be a better name than 
`topicControl` -- get the "Info" in there somehow.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);
+if (topicControl == null) {
+log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+topicError = Errors.UNKNOWN_TOPIC_ID;
+}
+AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+int partitionIndex = reqPartition.partitionIndex();
+Errors partitionError = topicError;
+if (topicError == Errors.NONE) {
+String topicName = topicControl.name;
+PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+if 

Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1415836932


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {

Review Comment:
   I've added a check in `ControllerApis.handleAssignReplicasToDirs` similar to 
`handleDescribeCluster`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-04 Thread via GitHub


pprovenzano commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1414875644


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2016,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {

Review Comment:
   We should check if the Metadata version supports this request and return an 
error if not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-01 Thread via GitHub


soarez commented on PR #14863:
URL: https://github.com/apache/kafka/pull/14863#issuecomment-1836233799

   Ready for review. Draft because it depends on 
https://github.com/apache/kafka/pull/14838 so ignore the first commit.
   
   @cmccabe @rondagostino @pprovenzano 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-11-29 Thread via GitHub


soarez commented on PR #14863:
URL: https://github.com/apache/kafka/pull/14863#issuecomment-1831518299

   Depends on #14820 and #14838
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-11-29 Thread via GitHub


soarez opened a new pull request, #14863:
URL: https://github.com/apache/kafka/pull/14863

   Handle AssignReplicasToDirs requests, persist metadata changes with new 
directory assignments and possible leader elections.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org