junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r667223580



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), 
OptionalInt.empty());

Review comment:
       Thanks, Colin. It seems that we don't enforce the same replication 
factor within a topic now. So, we can keep the logic as it is and have a 
followup jira to improve this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to