junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r644352228



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+    private final Set<Integer> removing;
+    private final Set<Integer> adding;
+
+    RemovingAndAddingReplicas(int[] removing, int[] adding) {
+        this.removing = new HashSet<>();
+        for (int replica : removing) {
+            this.removing.add(replica);
+        }
+        this.adding = new HashSet<>();
+        for (int replica : adding) {
+            this.adding.add(replica);
+        }
+    }
+
+    RemovingAndAddingReplicas(List<Integer> removing, List<Integer> adding) {
+        this.removing = new HashSet<>(removing);
+        this.adding = new HashSet<>(adding);
+    }
+
+    /**
+     * Calculate what replicas need to be added and removed to reach a 
specific target
+     * replica list.
+     *
+     * @param currentReplicas   The current replica list.
+     * @param targetReplicas    The target replica list.
+     *
+     * @return                  An object containing the removing and adding 
replicas.
+     */
+    static RemovingAndAddingReplicas forTarget(List<Integer> currentReplicas,
+                                               List<Integer> targetReplicas) {
+        List<Integer> removingReplicas = new ArrayList<>();
+        List<Integer> addingReplicas = new ArrayList<>();
+        List<Integer> sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+        sortedCurrentReplicas.sort(Integer::compareTo);
+        List<Integer> sortedTargetReplicas = new ArrayList<>(targetReplicas);
+        sortedTargetReplicas.sort(Integer::compareTo);
+        int i = 0, j = 0;
+        while (true) {
+            if (i < sortedCurrentReplicas.size()) {
+                int currentReplica = sortedCurrentReplicas.get(i);
+                if (j < sortedTargetReplicas.size()) {
+                    int targetReplica = sortedTargetReplicas.get(j);
+                    if (currentReplica < targetReplica) {
+                        removingReplicas.add(currentReplica);
+                        i++;
+                    } else if (currentReplica > targetReplica) {
+                        addingReplicas.add(targetReplica);
+                        j++;
+                    } else {
+                        i++;
+                        j++;
+                    }
+                } else {
+                    removingReplicas.add(currentReplica);
+                    i++;
+                }
+            } else if (j < sortedTargetReplicas.size()) {
+                int targetReplica = sortedTargetReplicas.get(j);
+                addingReplicas.add(targetReplica);
+                j++;
+            } else {
+                break;
+            }
+        }
+        return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+    }
+
+    /**
+     * Calculate the merged replica list following a reassignment.
+     *
+     * The merged list will contain all of the target replicas, in the order 
they appear
+     * in the target list.  It will also contain existing replicas that are 
scheduled to
+     * be removed.
+     *
+     * If a removing replica was in position X in the original replica list, 
it will
+     * appear in the merged list following the appearance of X non-new 
replicas.

Review comment:
       The logic seems a bit complicated and I am not sure how useful it is. 
For example, if current and target are (1, 3, 4) and (1, 2, 4), this method 
returns (1, 3, 2, 4). The old code calculates merged replicas simply as (target 
++ originReplicas).distinct. So, it would be (1,2,4,3). Any benefit of the new 
way of merging replicas?
   

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), 
OptionalInt.empty());
+        // Calculate the replicas to add and remove.
+        List<Integer> currentReplicas = 
Replicas.toList(partitionInfo.replicas);
+        RemovingAndAddingReplicas removingAndAdding =
+            RemovingAndAddingReplicas.forTarget(currentReplicas, 
partition.replicas());
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(part.topicId()).
+            setPartitionId(part.partitionId());
+        List<Integer> removing = removingAndAdding.removingAsList();
+        if (!removing.isEmpty()) record.setRemovingReplicas(removing);
+        List<Integer> adding = removingAndAdding.addingAsList();
+        if (!adding.isEmpty()) record.setAddingReplicas(adding);
+
+        // Calculate the merged replica list. This may involve reordering 
existing
+        // replicas.
+        List<Integer> newReplicas = removingAndAdding.
+            calculateMergedReplicas(currentReplicas, partition.replicas());
+        PartitionControlInfo nextPartitionInfo = partitionInfo.merge(record);
+        if 
(nextPartitionInfo.isrChangeCompletesReassignment(nextPartitionInfo.isr)) {
+            // Handle partition assignments which must be completed 
immediately.
+            // These assignments don't add any replicas, and don't remove 
replicas critical
+            // to maintaining a non-empty ISR.
+            record.setRemovingReplicas(null);
+            record.setAddingReplicas(null);
+            int[] newReplicasArray = 
Replicas.copyWithout(nextPartitionInfo.replicas,
+                nextPartitionInfo.removingReplicas);
+            newReplicas = Replicas.toList(newReplicasArray);
+            int[] newIsr = Replicas.copyWithout(nextPartitionInfo.isr,
+                nextPartitionInfo.removingReplicas);
+            if (!Arrays.equals(nextPartitionInfo.isr, newIsr)) {
+                // Check if we need to elect a new leader.
+                if (!Replicas.contains(newIsr, partitionInfo.leader)) {
+                    int newLeader = bestLeader(newReplicasArray, newIsr,
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, newIsr)) {
+                        newIsr = new int[] {newLeader};
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(Replicas.toList(newIsr));
+            }
+        }
+        if (!currentReplicas.equals(newReplicas)) {
+            record.setReplicas(newReplicas);
+        }
+        // Check if there are any partition changes resulting from the above. 
If there
+        // are, add the appropriate record.
+        if (recordContainsChanges(record)) {
+            records.add(new ApiMessageAndVersion(record, (short) 0));
+        }
+    }
+
+    /**
+     * Returns true if a partition change record doesn't actually change 
anything about
+     * the partition.
+     */
+    static boolean recordContainsChanges(PartitionChangeRecord record) {
+        if (record.isr() != null) return true;
+        if (record.leader() != NO_LEADER_CHANGE) return true;
+        if (record.replicas() != null) return true;
+        if (record.removingReplicas() != null) return true;
+        if (record.addingReplicas() != null) return true;
+        return false;
+    }
+
+    ListPartitionReassignmentsResponseData listPartitionReassignments(
+            List<ListPartitionReassignmentsTopics> topicList) {
+        if (topicList == null) {
+            return listAllPartitionReassignments();
+        }
+        ListPartitionReassignmentsResponseData response =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+        for (ListPartitionReassignmentsTopics topic : topicList) {
+            Uuid topicId = topicsByName.get(topic.name());
+            if (topicId != null) {
+                TopicControlInfo topicInfo = topics.get(topicId);
+                if (topicInfo == null) {
+                    throw new RuntimeException("No topic entry found for " + 
topicId);

Review comment:
       Should we set an error code in the response instead of throwing an 
exception?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), 
OptionalInt.empty());

Review comment:
       Should we further verify that after the reassignment, all partitions for 
the same topic have the same number of replicas?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {

Review comment:
       We have this logic duplicated in multiple places. It's easy to forget 
about this part after each leader election. Could we factor that into 
bestLeader() such that it returns the new leader and the new isr?

##########
File path: 
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##########
@@ -29,6 +29,15 @@
       "about": "null if the ISR didn't change; the new in-sync replicas 
otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": 
"brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": 
"brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 2,

Review comment:
       Should versions start from 1+ for the new fields?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -235,6 +256,41 @@ int preferredReplica() {
             return replicas.length == 0 ? NO_LEADER : replicas[0];
         }
 
+        boolean isReassigning() {
+            return removingReplicas.length > 0 || addingReplicas.length > 0;
+        }
+
+        /**
+         * Check if an ISR change completes this partition's reassignment.
+         *
+         * @param newIsr    The new ISR.
+         * @return          True if the reassignment is complete.
+         */
+        boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
       The old controller disables auto leader balancing when a partition 
reassignment is in progress. Should we do the same thing here?

##########
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##########
@@ -27,9 +27,9 @@
       "about": "The replicas of this partition, sorted by preferred order." },
     { "name": "Isr", "type":  "[]int32", "versions":  "0+",
       "about": "The in-sync replicas of this partition" },
-    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"nullableVersions": "0+", "entityType": "brokerId",
+    { "name": "RemovingReplicas", "type":  "[]int32", "versions":  "0+", 
"entityType": "brokerId",

Review comment:
       Do we need to bump up the version by removing nullable?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/RemovingAndAddingReplicas.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+
+class RemovingAndAddingReplicas {
+    private final Set<Integer> removing;
+    private final Set<Integer> adding;
+
+    RemovingAndAddingReplicas(int[] removing, int[] adding) {
+        this.removing = new HashSet<>();
+        for (int replica : removing) {
+            this.removing.add(replica);
+        }
+        this.adding = new HashSet<>();
+        for (int replica : adding) {
+            this.adding.add(replica);
+        }
+    }
+
+    RemovingAndAddingReplicas(List<Integer> removing, List<Integer> adding) {
+        this.removing = new HashSet<>(removing);
+        this.adding = new HashSet<>(adding);
+    }
+
+    /**
+     * Calculate what replicas need to be added and removed to reach a 
specific target
+     * replica list.
+     *
+     * @param currentReplicas   The current replica list.
+     * @param targetReplicas    The target replica list.
+     *
+     * @return                  An object containing the removing and adding 
replicas.
+     */
+    static RemovingAndAddingReplicas forTarget(List<Integer> currentReplicas,
+                                               List<Integer> targetReplicas) {
+        List<Integer> removingReplicas = new ArrayList<>();
+        List<Integer> addingReplicas = new ArrayList<>();
+        List<Integer> sortedCurrentReplicas = new ArrayList<>(currentReplicas);
+        sortedCurrentReplicas.sort(Integer::compareTo);
+        List<Integer> sortedTargetReplicas = new ArrayList<>(targetReplicas);
+        sortedTargetReplicas.sort(Integer::compareTo);
+        int i = 0, j = 0;
+        while (true) {
+            if (i < sortedCurrentReplicas.size()) {
+                int currentReplica = sortedCurrentReplicas.get(i);
+                if (j < sortedTargetReplicas.size()) {
+                    int targetReplica = sortedTargetReplicas.get(j);
+                    if (currentReplica < targetReplica) {
+                        removingReplicas.add(currentReplica);
+                        i++;
+                    } else if (currentReplica > targetReplica) {
+                        addingReplicas.add(targetReplica);
+                        j++;
+                    } else {
+                        i++;
+                        j++;
+                    }
+                } else {
+                    removingReplicas.add(currentReplica);
+                    i++;
+                }
+            } else if (j < sortedTargetReplicas.size()) {
+                int targetReplica = sortedTargetReplicas.get(j);
+                addingReplicas.add(targetReplica);
+                j++;
+            } else {
+                break;
+            }
+        }
+        return new RemovingAndAddingReplicas(removingReplicas, addingReplicas);
+    }
+
+    /**
+     * Calculate the merged replica list following a reassignment.
+     *
+     * The merged list will contain all of the target replicas, in the order 
they appear
+     * in the target list.  It will also contain existing replicas that are 
scheduled to
+     * be removed.
+     *
+     * If a removing replica was in position X in the original replica list, 
it will
+     * appear in the merged list following the appearance of X non-new 
replicas.
+     *
+     * @param currentReplicas       The current replica list.
+     * @param targetReplicas        The target replica list.
+     *
+     * @return                      The merged replica list.
+     */
+    List<Integer> calculateMergedReplicas(List<Integer> currentReplicas,

Review comment:
       The old class ReplicaAssignment stores the current/target list as well. 
Then one doesn't need to provide current and target again to compute merged 
replicas, which eliminates inconsistency. Could we do the same here?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,

Review comment:
       Could we add some comments to document the partition reassignment flow? 
For example, what triggers the new replicas to be added, what triggers the 
completion of the reassignment, how are old replicas deleted, etc.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -711,56 +830,73 @@ BrokersToIsrs brokersToIsrs() {
             }
             TopicControlInfo topic = topics.get(topicId);
             for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
-                PartitionControlInfo partition = 
topic.parts.get(partitionData.partitionIndex());
+                int partitionId = partitionData.partitionIndex();
+                PartitionControlInfo partition = topic.parts.get(partitionId);
                 if (partition == null) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
                     continue;
                 }
                 if (request.brokerId() != partition.leader) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (partitionData.leaderEpoch() != partition.leaderEpoch) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+                        setPartitionIndex(partitionId).
+                        setErrorCode(FENCED_LEADER_EPOCH.code()));
                     continue;
                 }
                 if (partitionData.currentIsrVersion() != 
partition.partitionEpoch) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
                     continue;
                 }
                 int[] newIsr = Replicas.toArray(partitionData.newIsr());
                 if (!Replicas.validateIsr(partition.replicas, newIsr)) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (!Replicas.contains(newIsr, partition.leader)) {
                     // An alterIsr request can't remove the current leader.
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                        setPartitionIndex(partitionData.partitionIndex()).
+                        setPartitionIndex(partitionId).
                         setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-                    setPartitionId(partitionData.partitionIndex()).
-                    setTopicId(topic.id).
-                    setIsr(partitionData.newIsr()), (short) 0));
-                responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
-                    setPartitionIndex(partitionData.partitionIndex()).
-                    setErrorCode(Errors.NONE.code()).
-                    setLeaderId(partition.leader).
-                    setLeaderEpoch(partition.leaderEpoch).
-                    setCurrentIsrVersion(partition.partitionEpoch + 1).
-                    setIsr(partitionData.newIsr()));
+                PartitionChangeRecord record = 
generateLeaderAndIsrUpdate(topic,

Review comment:
       In the old logic, when the reassignment completes, even if the leader 
doesn't need to change, we bump up the leader epoch to prevent the removed 
replicas from being added to back to ISR unnecessarily. 
generateLeaderAndIsrUpdate() doesn't seem to do the same thing?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -343,12 +409,21 @@ public String toString() {
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+        this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.reassigningPartitionCount = new TimelineInteger(snapshotRegistry);
     }
 
     public void replay(TopicRecord record) {
-        topicsByName.put(record.name(), record.topicId());
-        topics.put(record.topicId(),
+        Uuid prevTopicId = topicsByName.get(record.name());
+        if (prevTopicId != null) {
+            replay(new RemoveTopicRecord().setTopicId(prevTopicId));
+        }
+        TopicControlInfo prevTopic = topics.put(record.topicId(),
             new TopicControlInfo(record.name(), snapshotRegistry, 
record.topicId()));
+        if (prevTopic != null) {

Review comment:
       Hmm, in this case, the topicId is the same, by removing the topic, we 
are deleting the current topicId. Perhaps we should throw an 
IllegalStateException since this is not expected?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
##########
@@ -90,6 +97,13 @@ public Integer value() {
                 return preferredReplicaImbalanceCount;
             }
         });
+        this.reassigningPartitionCountGauge = omitBrokerMetrics ? null :

Review comment:
       So, in the combined mode, we won't report REASSIGNING_PARTITION_COUNT? 
It seems it's useful in the combined mode too.

##########
File path: 
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
##########
@@ -29,6 +29,15 @@
       "about": "null if the ISR didn't change; the new in-sync replicas 
otherwise." },
     { "name": "Leader", "type": "int32", "default": "-2", "entityType": 
"brokerId",
       "versions": "0+", "taggedVersions": "0+", "tag": 1,
-      "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." }
+      "about": "-1 if there is now no leader; -2 if the leader didn't change; 
the new leader otherwise." },
+    { "name": "Replicas", "type": "[]int32", "default": "null", "entityType": 
"brokerId",
+      "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 2,
+      "about": "null if the replicas didn't change; the new replicas 
otherwise." },
+    { "name": "RemovingReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",

Review comment:
       Why is RemovingReplicas nullable here, but not in PartitionRecord.json?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1111,19 +1111,19 @@ private QuorumController(LogContext logContext,
         }
         return appendWriteEvent("alterPartitionReassignments",
             time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), 
MILLISECONDS),
-            () -> {
-                throw new UnsupportedOperationException();
-            });
+            () -> replicationControl.alterPartitionReassignments(request));
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData 
request) {
+        if (request.topics() != null && request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null));

Review comment:
       Is there a need for setErrorMessage(null)? It seem the error message 
defaults to null.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1204,6 +1317,298 @@ void generateLeaderAndIsrUpdates(String context,
         }
     }
 
+    PartitionChangeRecord generateLeaderAndIsrUpdate(TopicControlInfo topic,
+                                                     int partitionId,
+                                                     PartitionControlInfo 
partition,
+                                                     int[] newIsr,
+                                                     Function<Integer, 
Boolean> isAcceptableLeader) {
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topic.id).
+            setPartitionId(partitionId);
+        int[] newReplicas = partition.replicas;
+        if (partition.isrChangeCompletesReassignment(newIsr)) {
+            if (partition.addingReplicas.length > 0) {
+                record.setAddingReplicas(Collections.emptyList());
+            }
+            if (partition.removingReplicas.length > 0) {
+                record.setRemovingReplicas(Collections.emptyList());
+                newIsr = Replicas.copyWithout(newIsr, 
partition.removingReplicas);
+                newReplicas = Replicas.copyWithout(partition.replicas, 
partition.removingReplicas);
+            }
+        }
+        int newLeader;
+        if (Replicas.contains(newIsr, partition.leader)) {
+            // If the current leader is good, don't change.
+            newLeader = partition.leader;
+        } else {
+            // Choose a new leader.
+            boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+            newLeader = bestLeader(newReplicas, newIsr, uncleanOk, 
isAcceptableLeader);
+        }
+        if (!electionWasClean(newLeader, newIsr)) {
+            // After an unclean leader election, the ISR is reset to just the 
new leader.
+            newIsr = new int[] {newLeader};
+        } else if (newIsr.length == 0) {
+            // We never want to shrink the ISR to size 0.
+            newIsr = partition.isr;
+        }
+        if (newLeader != partition.leader) record.setLeader(newLeader);
+        if (!Arrays.equals(newIsr, partition.isr)) {
+            record.setIsr(Replicas.toList(newIsr));
+        }
+        if (!Arrays.equals(newReplicas, partition.replicas)) {
+            record.setReplicas(Replicas.toList(newReplicas));
+        }
+        return record;
+    }
+
+    ControllerResult<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData 
request) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AlterPartitionReassignmentsResponseData result =
+                new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null);
+        int successfulAlterations = 0, totalAlterations = 0;
+        for (ReassignableTopic topic : request.topics()) {
+            ReassignableTopicResponse topicResponse = new 
ReassignableTopicResponse().
+                setName(topic.name());
+            for (ReassignablePartition partition : topic.partitions()) {
+                ApiError error = ApiError.NONE;
+                try {
+                    alterPartitionReassignment(topic.name(), partition, 
records);
+                    successfulAlterations++;
+                } catch (Throwable e) {
+                    log.info("Unable to alter partition reassignment for " +
+                        topic.name() + ":" + partition.partitionIndex() + " 
because " +
+                        "of an " + e.getClass().getSimpleName() + " error: " + 
e.getMessage());
+                    error = ApiError.fromThrowable(e);
+                }
+                totalAlterations++;
+                topicResponse.partitions().add(new 
ReassignablePartitionResponse().
+                    setPartitionIndex(partition.partitionIndex()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+            result.responses().add(topicResponse);
+        }
+        log.info("Successfully altered {} out of {} partition 
reassignment(s).",
+            successfulAlterations, totalAlterations);
+        return ControllerResult.atomicOf(records, result);
+    }
+
+    void alterPartitionReassignment(String topicName,
+                                    ReassignablePartition partition,
+                                    List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topicName);
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "named " + topicName + ".");
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find a topic 
" +
+                "with ID " + topicId + ".");
+        }
+        TopicIdPartition part = new TopicIdPartition(topicId, 
partition.partitionIndex());
+        PartitionControlInfo partitionInfo = 
topicInfo.parts.get(partition.partitionIndex());
+        if (partitionInfo == null) {
+            throw new UnknownTopicOrPartitionException("Unable to find 
partition " +
+                topicName + ":" + partition.partitionIndex() + ".");
+        }
+        if (partition.replicas() == null) {
+            cancelPartitionReassignment(topicName, part, partitionInfo, 
records);
+        } else {
+            changePartitionReassignment(topicName, part, partitionInfo, 
partition, records);
+        }
+    }
+
+    void cancelPartitionReassignment(String topicName,
+                                     TopicIdPartition topicIdPartition,
+                                     PartitionControlInfo partition,
+                                     List<ApiMessageAndVersion> records) {
+        if (!partition.isReassigning()) {
+            throw new 
NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message());
+        }
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(topicIdPartition.topicId()).
+            setPartitionId(topicIdPartition.partitionId());
+        if (partition.removingReplicas.length > 0) {
+            record.setRemovingReplicas(Collections.emptyList());
+        }
+        if (partition.addingReplicas.length > 0) {
+            record.setAddingReplicas(Collections.emptyList());
+        }
+        RemovingAndAddingReplicas removingAndAddingReplicas =
+            new RemovingAndAddingReplicas(partition.removingReplicas, 
partition.addingReplicas);
+        List<Integer> currentReplicas = Replicas.toList(partition.replicas);
+        List<Integer> currentIsr = Replicas.toList(partition.isr);
+        List<Integer> revertedReplicas = removingAndAddingReplicas.
+            calculateRevertedReplicas(currentReplicas, currentIsr);
+        if (!revertedReplicas.equals(currentReplicas)) {
+            record.setReplicas(revertedReplicas);
+            List<Integer> newIsr = new ArrayList<>();
+            for (int replica : partition.isr) {
+                if (revertedReplicas.contains(replica)) {
+                    newIsr.add(replica);
+                }
+            }
+            if (!newIsr.equals(currentIsr)) {
+                if (!newIsr.contains(partition.leader)) {
+                    int newLeader = 
bestLeader(Replicas.toArray(revertedReplicas),
+                        Replicas.toArray(newIsr),
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, 
Replicas.toArray(newIsr))) {
+                        newIsr = Collections.singletonList(newLeader);
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(newIsr);
+            }
+        }
+        records.add(new ApiMessageAndVersion(record, (short) 0));
+    }
+
+    void changePartitionReassignment(String topicName,
+                                     TopicIdPartition part,
+                                     PartitionControlInfo partitionInfo,
+                                     ReassignablePartition partition,
+                                     List<ApiMessageAndVersion> records) {
+        // Check that the requested partition assignment is valid.
+        validateManualPartitionAssignment(partition.replicas(), 
OptionalInt.empty());
+        // Calculate the replicas to add and remove.
+        List<Integer> currentReplicas = 
Replicas.toList(partitionInfo.replicas);
+        RemovingAndAddingReplicas removingAndAdding =
+            RemovingAndAddingReplicas.forTarget(currentReplicas, 
partition.replicas());
+        PartitionChangeRecord record = new PartitionChangeRecord().
+            setTopicId(part.topicId()).
+            setPartitionId(part.partitionId());
+        List<Integer> removing = removingAndAdding.removingAsList();
+        if (!removing.isEmpty()) record.setRemovingReplicas(removing);
+        List<Integer> adding = removingAndAdding.addingAsList();
+        if (!adding.isEmpty()) record.setAddingReplicas(adding);
+
+        // Calculate the merged replica list. This may involve reordering 
existing
+        // replicas.
+        List<Integer> newReplicas = removingAndAdding.
+            calculateMergedReplicas(currentReplicas, partition.replicas());
+        PartitionControlInfo nextPartitionInfo = partitionInfo.merge(record);
+        if 
(nextPartitionInfo.isrChangeCompletesReassignment(nextPartitionInfo.isr)) {
+            // Handle partition assignments which must be completed 
immediately.
+            // These assignments don't add any replicas, and don't remove 
replicas critical
+            // to maintaining a non-empty ISR.
+            record.setRemovingReplicas(null);
+            record.setAddingReplicas(null);
+            int[] newReplicasArray = 
Replicas.copyWithout(nextPartitionInfo.replicas,
+                nextPartitionInfo.removingReplicas);
+            newReplicas = Replicas.toList(newReplicasArray);
+            int[] newIsr = Replicas.copyWithout(nextPartitionInfo.isr,
+                nextPartitionInfo.removingReplicas);
+            if (!Arrays.equals(nextPartitionInfo.isr, newIsr)) {
+                // Check if we need to elect a new leader.
+                if (!Replicas.contains(newIsr, partitionInfo.leader)) {
+                    int newLeader = bestLeader(newReplicasArray, newIsr,
+                        
configurationControl.uncleanLeaderElectionEnabledForTopic(topicName),
+                        r -> clusterControl.unfenced(r));
+                    if (!electionWasClean(newLeader, newIsr)) {
+                        newIsr = new int[] {newLeader};
+                    }
+                    record.setLeader(newLeader);
+                }
+                record.setIsr(Replicas.toList(newIsr));
+            }
+        }
+        if (!currentReplicas.equals(newReplicas)) {
+            record.setReplicas(newReplicas);
+        }
+        // Check if there are any partition changes resulting from the above. 
If there
+        // are, add the appropriate record.
+        if (recordContainsChanges(record)) {
+            records.add(new ApiMessageAndVersion(record, (short) 0));
+        }
+    }
+
+    /**
+     * Returns true if a partition change record doesn't actually change 
anything about
+     * the partition.
+     */
+    static boolean recordContainsChanges(PartitionChangeRecord record) {
+        if (record.isr() != null) return true;
+        if (record.leader() != NO_LEADER_CHANGE) return true;
+        if (record.replicas() != null) return true;
+        if (record.removingReplicas() != null) return true;
+        if (record.addingReplicas() != null) return true;
+        return false;
+    }
+
+    ListPartitionReassignmentsResponseData listPartitionReassignments(
+            List<ListPartitionReassignmentsTopics> topicList) {
+        if (topicList == null) {
+            return listAllPartitionReassignments();
+        }
+        ListPartitionReassignmentsResponseData response =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+        for (ListPartitionReassignmentsTopics topic : topicList) {
+            Uuid topicId = topicsByName.get(topic.name());
+            if (topicId != null) {
+                TopicControlInfo topicInfo = topics.get(topicId);
+                if (topicInfo == null) {
+                    throw new RuntimeException("No topic entry found for " + 
topicId);
+                }
+                OngoingTopicReassignment ongoingTopic = new 
OngoingTopicReassignment().
+                    setName(topic.name());
+                for (int partitionId : topic.partitionIndexes()) {
+                    Optional<OngoingPartitionReassignment> ongoing =
+                        getOngoingPartitionReassignment(topicInfo, 
partitionId);
+                    if (ongoing.isPresent()) {
+                        ongoingTopic.partitions().add(ongoing.get());
+                    }
+                }
+                if (!ongoingTopic.partitions().isEmpty()) {
+                    response.topics().add(ongoingTopic);
+                }
+            }
+        }
+        return response;
+    }
+
+    ListPartitionReassignmentsResponseData listAllPartitionReassignments() {

Review comment:
       This method is almost the same as listPartitionReassignments(). Could we 
reuse the logic somehow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to