cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667726466
##########
File path:
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
}
}
+ PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+ int partitionId,
+ PartitionControlInfo
partition,
+ int[] newIsr,
+ Function<Integer,
Boolean> isAcceptableLeader) {
+ PartitionChangeRecord record = new PartitionChangeRecord().
+ setTopicId(topic.id).
+ setPartitionId(partitionId);
+ int[] newReplicas = partition.replicas;
+ if (partition.isrChangeCompletesReassignment(newIsr)) {
+ if (partition.addingReplicas.length > 0) {
+ record.setAddingReplicas(Collections.emptyList());
+ }
+ if (partition.removingReplicas.length > 0) {
+ record.setRemovingReplicas(Collections.emptyList());
+ newIsr = Replicas.copyWithout(newIsr,
partition.removingReplicas);
+ newReplicas = Replicas.copyWithout(partition.replicas,
partition.removingReplicas);
+ }
+ }
+ int newLeader;
+ if (Replicas.contains(newIsr, partition.leader)) {
+ // If the current leader is good, don't change.
+ newLeader = partition.leader;
+ } else {
+ // Choose a new leader.
+ boolean uncleanOk =
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+ newLeader = bestLeader(newReplicas, newIsr, uncleanOk,
isAcceptableLeader);
+ }
+ if (!electionWasClean(newLeader, newIsr)) {
+ // After an unclean leader election, the ISR is reset to just the
new leader.
+ newIsr = new int[] {newLeader};
+ } else if (newIsr.length == 0) {
+ // We never want to shrink the ISR to size 0.
+ newIsr = partition.isr;
+ }
+ if (newLeader != partition.leader) record.setLeader(newLeader);
+ if (!Arrays.equals(newIsr, partition.isr)) {
+ record.setIsr(Replicas.toList(newIsr));
+ }
+ if (!Arrays.equals(newReplicas, partition.replicas)) {
+ record.setReplicas(Replicas.toList(newReplicas));
+ }
+ return record;
+ }
+
+ ControllerResult<AlterPartitionReassignmentsResponseData>
+ alterPartitionReassignments(AlterPartitionReassignmentsRequestData
request) {
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ AlterPartitionReassignmentsResponseData result =
+ new
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+ int successfulAlterations = 0, totalAlterations = 0;
+ for (ReassignableTopic topic : request.topics()) {
+ ReassignableTopicResponse topicResponse = new
ReassignableTopicResponse().
+ setName(topic.name());
+ for (ReassignablePartition partition : topic.partitions()) {
+ ApiError error = ApiError.NONE;
+ try {
+ alterPartitionReassignment(topic.name(), partition,
records);
+ successfulAlterations++;
+ } catch (Throwable e) {
+ log.info("Unable to alter partition reassignment for " +
+ topic.name() + ":" + partition.partitionIndex() + "
because " +
+ "of an " + e.getClass().getSimpleName() + " error: " +
e.getMessage());
+ error = ApiError.fromThrowable(e);
+ }
+ totalAlterations++;
+ topicResponse.partitions().add(new
ReassignablePartitionResponse().
+ setPartitionIndex(partition.partitionIndex()).
+ setErrorCode(error.error().code()).
+ setErrorMessage(error.message()));
+ }
+ result.responses().add(topicResponse);
+ }
+ log.info("Successfully altered {} out of {} partition
reassignment(s).",
+ successfulAlterations, totalAlterations);
+ return ControllerResult.atomicOf(records, result);
+ }
+
+ void alterPartitionReassignment(String topicName,
+ ReassignablePartition partition,
+ List<ApiMessageAndVersion> records) {
+ Uuid topicId = topicsByName.get(topicName);
+ if (topicId == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find a topic
" +
+ "named " + topicName + ".");
+ }
+ TopicControlInfo topicInfo = topics.get(topicId);
+ if (topicInfo == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find a topic
" +
+ "with ID " + topicId + ".");
+ }
+ TopicIdPartition part = new TopicIdPartition(topicId,
partition.partitionIndex());
+ PartitionControlInfo partitionInfo =
topicInfo.parts.get(partition.partitionIndex());
+ if (partitionInfo == null) {
+ throw new UnknownTopicOrPartitionException("Unable to find
partition " +
+ topicName + ":" + partition.partitionIndex() + ".");
+ }
+ if (partition.replicas() == null) {
+ cancelPartitionReassignment(topicName, part, partitionInfo,
records);
+ } else {
+ changePartitionReassignment(topicName, part, partitionInfo,
partition, records);
+ }
+ }
+
+ void cancelPartitionReassignment(String topicName,
+ TopicIdPartition topicIdPartition,
+ PartitionControlInfo partition,
+ List<ApiMessageAndVersion> records) {
+ if (!partition.isReassigning()) {
+ throw new
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+ }
+ PartitionChangeRecord record = new PartitionChangeRecord().
+ setTopicId(topicIdPartition.topicId()).
+ setPartitionId(topicIdPartition.partitionId());
+ if (partition.removingReplicas.length > 0) {
+ record.setRemovingReplicas(Collections.emptyList());
+ }
+ if (partition.addingReplicas.length > 0) {
+ record.setAddingReplicas(Collections.emptyList());
+ }
+ RemovingAndAddingReplicas removingAndAddingReplicas =
+ new RemovingAndAddingReplicas(partition.removingReplicas,
partition.addingReplicas);
+ List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+ List<Integer> currentIsr = Replicas.toList(partition.isr);
+ List<Integer> revertedReplicas = removingAndAddingReplicas.
+ calculateRevertedReplicas(currentReplicas, currentIsr);
+ if (!revertedReplicas.equals(currentReplicas)) {
+ record.setReplicas(revertedReplicas);
+ List<Integer> newIsr = new ArrayList<>();
+ for (int replica : partition.isr) {
+ if (revertedReplicas.contains(replica)) {
+ newIsr.add(replica);
+ }
+ }
+ if (!newIsr.equals(currentIsr)) {
+ if (!newIsr.contains(partition.leader)) {
+ int newLeader =
bestLeader(Replicas.toArray(revertedReplicas),
+ Replicas.toArray(newIsr),
+
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+ r -> clusterControl.unfenced(r));
+ if (!electionWasClean(newLeader,
Replicas.toArray(newIsr))) {
+ newIsr = Collections.singletonList(newLeader);
+ }
+ record.setLeader(newLeader);
+ }
+ record.setIsr(newIsr);
+ }
+ }
+ records.add(new ApiMessageAndVersion(record, (short) 0));
+ }
+
+ void changePartitionReassignment(String topicName,
+ TopicIdPartition part,
+ PartitionControlInfo partitionInfo,
+ ReassignablePartition partition,
+ List<ApiMessageAndVersion> records) {
+ // Check that the requested partition assignment is valid.
+ validateManualPartitionAssignment(partition.replicas(),
OptionalInt.empty());
+ // Calculate the replicas to add and remove.
+ List<Integer> currentReplicas =
Replicas.toList(partitionInfo.replicas);
+ RemovingAndAddingReplicas removingAndAdding =
+ RemovingAndAddingReplicas.forTarget(currentReplicas,
partition.replicas());
+ PartitionChangeRecord record = new PartitionChangeRecord().
+ setTopicId(part.topicId()).
+ setPartitionId(part.partitionId());
+ List<Integer> removing = removingAndAdding.removingAsList();
+ if (!removing.isEmpty()) record.setRemovingReplicas(removing);
+ List<Integer> adding = removingAndAdding.addingAsList();
+ if (!adding.isEmpty()) record.setAddingReplicas(adding);
+
+ // Calculate the merged replica list. This may involve reordering
existing
+ // replicas.
+ List<Integer> newReplicas = removingAndAdding.
+ calculateMergedReplicas(currentReplicas, partition.replicas());
+ PartitionControlInfo nextPartitionInfo = partitionInfo.merge(record);
+ if
(nextPartitionInfo.isrChangeCompletesReassignment(nextPartitionInfo.isr)) {
+ // Handle partition assignments which must be completed
immediately.
+ // These assignments don't add any replicas, and don't remove
replicas critical
+ // to maintaining a non-empty ISR.
+ record.setRemovingReplicas(null);
+ record.setAddingReplicas(null);
+ int[] newReplicasArray =
Replicas.copyWithout(nextPartitionInfo.replicas,
+ nextPartitionInfo.removingReplicas);
+ newReplicas = Replicas.toList(newReplicasArray);
+ int[] newIsr = Replicas.copyWithout(nextPartitionInfo.isr,
+ nextPartitionInfo.removingReplicas);
+ if (!Arrays.equals(nextPartitionInfo.isr, newIsr)) {
+ // Check if we need to elect a new leader.
+ if (!Replicas.contains(newIsr, partitionInfo.leader)) {
+ int newLeader = bestLeader(newReplicasArray, newIsr,
+
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+ r -> clusterControl.unfenced(r));
+ if (!electionWasClean(newLeader, newIsr)) {
+ newIsr = new int[] {newLeader};
+ }
+ record.setLeader(newLeader);
+ }
+ record.setIsr(Replicas.toList(newIsr));
+ }
+ }
+ if (!currentReplicas.equals(newReplicas)) {
+ record.setReplicas(newReplicas);
+ }
+ // Check if there are any partition changes resulting from the above.
If there
+ // are, add the appropriate record.
+ if (recordContainsChanges(record)) {
+ records.add(new ApiMessageAndVersion(record, (short) 0));
+ }
+ }
+
+ /**
+ * Returns true if a partition change record doesn't actually change
anything about
+ * the partition.
+ */
+ static boolean recordContainsChanges(PartitionChangeRecord record) {
+ if (record.isr() != null) return true;
+ if (record.leader() != NO_LEADER_CHANGE) return true;
+ if (record.replicas() != null) return true;
+ if (record.removingReplicas() != null) return true;
+ if (record.addingReplicas() != null) return true;
+ return false;
+ }
+
+ ListPartitionReassignmentsResponseData listPartitionReassignments(
+ List<ListPartitionReassignmentsTopics> topicList) {
+ if (topicList == null) {
+ return listAllPartitionReassignments();
+ }
+ ListPartitionReassignmentsResponseData response =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+ for (ListPartitionReassignmentsTopics topic : topicList) {
+ Uuid topicId = topicsByName.get(topic.name());
+ if (topicId != null) {
+ TopicControlInfo topicInfo = topics.get(topicId);
+ if (topicInfo == null) {
+ throw new RuntimeException("No topic entry found for " +
topicId);
+ }
+ OngoingTopicReassignment ongoingTopic = new
OngoingTopicReassignment().
+ setName(topic.name());
+ for (int partitionId : topic.partitionIndexes()) {
+ Optional<OngoingPartitionReassignment> ongoing =
+ getOngoingPartitionReassignment(topicInfo,
partitionId);
+ if (ongoing.isPresent()) {
+ ongoingTopic.partitions().add(ongoing.get());
+ }
+ }
+ if (!ongoingTopic.partitions().isEmpty()) {
+ response.topics().add(ongoingTopic);
+ }
+ }
+ }
+ return response;
+ }
+
+ ListPartitionReassignmentsResponseData listAllPartitionReassignments() {
Review comment:
Yes, I have refactored these to have much more in common
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]