junrao commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r578693886



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+
+public class FeatureControlManager {
+    /**
+     * The features supported by this controller's software.
+     */
+    private final Map<String, VersionRange> supportedFeatures;
+
+    /**
+     * Maps feature names to finalized version ranges.
+     */
+    private final TimelineHashMap<String, VersionRange> finalizedVersions;
+
+    /**
+     * The latest feature epoch.
+     */
+    private final TimelineHashSet<Long> epoch;
+
+    FeatureControlManager(Map<String, VersionRange> supportedFeatures,
+                          SnapshotRegistry snapshotRegistry) {
+        this.supportedFeatures = supportedFeatures;
+        this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.epoch = new TimelineHashSet<>(snapshotRegistry, 0);
+    }
+
+    ControllerResult<Map<String, ApiError>> updateFeatures(
+            Map<String, VersionRange> updates, Set<String> downgradeables,
+            Map<Integer, Map<String, VersionRange>> brokerFeatures) {
+        TreeMap<String, ApiError> results = new TreeMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, VersionRange> entry : updates.entrySet()) {
+            results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
+                downgradeables.contains(entry.getKey()), brokerFeatures, 
records));
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    private ApiError updateFeature(String featureName,
+                                   VersionRange newRange,
+                                   boolean downgradeable,
+                                   Map<Integer, Map<String, VersionRange>> 
brokerFeatures,
+                                   List<ApiMessageAndVersion> records) {
+        if (newRange.min() <= 0) {
+            return new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "The lower value for the new range cannot be less than 1.");
+        }
+        if (newRange.max() <= 0) {
+            return new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "The upper value for the new range cannot be less than 1.");
+        }
+        VersionRange localRange = supportedFeatures.get(featureName);

Review comment:
       This can be revisited later. When finalizing a feature, should be 
consider other controller's supported features too?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -0,0 +1,925 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+
+public final class QuorumController implements Controller {

Review comment:
       Could we add some comments for this class?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+    static class TopicControlInfo {
+        private final Uuid id;
+        private final TimelineHashMap<Integer, PartitionControlInfo> parts;
+
+        TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+            this.id = id;
+            this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+        }
+    }
+
+    static class PartitionControlInfo {
+        private final int[] replicas;
+        private final int[] isr;
+        private final int[] removingReplicas;
+        private final int[] addingReplicas;
+        private final int leader;
+        private final int leaderEpoch;
+        private final int partitionEpoch;
+
+        PartitionControlInfo(PartitionRecord record) {
+            this(Replicas.toArray(record.replicas()),
+                Replicas.toArray(record.isr()),
+                Replicas.toArray(record.removingReplicas()),
+                Replicas.toArray(record.addingReplicas()),
+                record.leader(),
+                record.leaderEpoch(),
+                record.partitionEpoch());
+        }
+
+        PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+                             int[] addingReplicas, int leader, int leaderEpoch,
+                             int partitionEpoch) {
+            this.replicas = replicas;
+            this.isr = isr;
+            this.removingReplicas = removingReplicas;
+            this.addingReplicas = addingReplicas;
+            this.leader = leader;
+            this.leaderEpoch = leaderEpoch;
+            this.partitionEpoch = partitionEpoch;
+        }
+
+        PartitionControlInfo merge(PartitionChangeRecord record) {
+            return new PartitionControlInfo(replicas,
+                Replicas.toArray(record.isr()),
+                removingReplicas,
+                addingReplicas,
+                record.leader(),
+                record.leaderEpoch(),
+                record.partitionEpoch());
+        }
+
+        String diff(PartitionControlInfo prev) {
+            StringBuilder builder = new StringBuilder();
+            String prefix = "";
+            if (!Arrays.equals(replicas, prev.replicas)) {
+                
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+                prefix = ", ";
+                
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
+            }
+            if (!Arrays.equals(isr, prev.isr)) {
+                
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+                prefix = ", ";
+                
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
+            }
+            if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
+                builder.append(prefix).append("oldRemovingReplicas=").
+                    append(Arrays.toString(prev.removingReplicas));
+                prefix = ", ";
+                builder.append(prefix).append("newRemovingReplicas=").
+                    append(Arrays.toString(removingReplicas));
+            }
+            if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
+                builder.append(prefix).append("oldAddingReplicas=").
+                    append(Arrays.toString(prev.addingReplicas));
+                prefix = ", ";
+                builder.append(prefix).append("newAddingReplicas=").
+                    append(Arrays.toString(addingReplicas));
+            }
+            if (leader != prev.leader) {
+                
builder.append(prefix).append("oldLeader=").append(prev.leader);
+                prefix = ", ";
+                builder.append(prefix).append("newLeader=").append(leader);
+            }
+            if (leaderEpoch != prev.leaderEpoch) {
+                
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+                prefix = ", ";
+                
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
+            }
+            if (partitionEpoch != prev.partitionEpoch) {
+                
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
+                prefix = ", ";
+                
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+            }
+            return builder.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(replicas, isr, removingReplicas, 
addingReplicas, leader,
+                leaderEpoch, partitionEpoch);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof PartitionControlInfo)) return false;
+            PartitionControlInfo other = (PartitionControlInfo) o;
+            return diff(other).isEmpty();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder("PartitionControlInfo(");
+            builder.append("replicas=").append(Arrays.toString(replicas));
+            builder.append(", isr=").append(Arrays.toString(isr));
+            builder.append(", 
removingReplicas=").append(Arrays.toString(removingReplicas));
+            builder.append(", 
addingReplicas=").append(Arrays.toString(addingReplicas));
+            builder.append(", leader=").append(leader);
+            builder.append(", leaderEpoch=").append(leaderEpoch);
+            builder.append(", partitionEpoch=").append(partitionEpoch);
+            builder.append(")");
+            return builder.toString();
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+    private final Logger log;
+
+    /**
+     * The random number generator used by this object.
+     */
+    private final Random random;
+
+    /**
+     * The KIP-464 default replication factor that is used if a CreateTopics 
request does
+     * not specify one.
+     */
+    private final short defaultReplicationFactor;
+
+    /**
+     * The KIP-464 default number of partitions that is used if a CreateTopics 
request does
+     * not specify a number of partitions.
+     */
+    private final int defaultNumPartitions;
+
+    /**
+     * A reference to the controller's configuration control manager.
+     */
+    private final ConfigurationControlManager configurationControl;
+
+    /**
+     * A reference to the controller's cluster control manager.
+     */
+    final ClusterControlManager clusterControl;
+
+    /**
+     * Maps topic names to topic UUIDs.
+     */
+    private final TimelineHashMap<String, Uuid> topicsByName;
+
+    /**
+     * Maps topic UUIDs to structures containing topic information, including 
partitions.
+     */
+    private final TimelineHashMap<Uuid, TopicControlInfo> topics;
+
+    /**
+     * A map of broker IDs to the partitions that the broker is in the ISR for.
+     */
+    private final BrokersToIsrs brokersToIsrs;
+
+    ReplicationControlManager(SnapshotRegistry snapshotRegistry,
+                              LogContext logContext,
+                              Random random,
+                              short defaultReplicationFactor,
+                              int defaultNumPartitions,
+                              ConfigurationControlManager configurationControl,
+                              ClusterControlManager clusterControl) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.log = logContext.logger(ReplicationControlManager.class);
+        this.random = random;
+        this.defaultReplicationFactor = defaultReplicationFactor;
+        this.defaultNumPartitions = defaultNumPartitions;
+        this.configurationControl = configurationControl;
+        this.clusterControl = clusterControl;
+        this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+    }
+
+    public void replay(TopicRecord record) {
+        topicsByName.put(record.name(), record.topicId());
+        topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, 
record.topicId()));
+        log.info("Created topic {} with ID {}.", record.name(), 
record.topicId());
+    }
+
+    public void replay(PartitionRecord record) {
+        TopicControlInfo topicInfo = topics.get(record.topicId());
+        if (topicInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no topic with that ID was 
found.");
+        }
+        PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
+        PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+        if (prevPartInfo == null) {
+            log.info("Created partition {}:{} with {}.", record.topicId(),
+                record.partitionId(), newPartInfo.toString());
+            topicInfo.parts.put(record.partitionId(), newPartInfo);
+            brokersToIsrs.update(record.topicId(), record.partitionId(), null,
+                newPartInfo.isr, -1, newPartInfo.leader);
+        } else {
+            String diff = newPartInfo.diff(prevPartInfo);
+            if (!diff.isEmpty()) {
+                log.info("Modified partition {}:{}: {}.", record.topicId(),
+                    record.partitionId(), diff);
+                topicInfo.parts.put(record.partitionId(), newPartInfo);
+                brokersToIsrs.update(record.topicId(), record.partitionId(),
+                    prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader,
+                    newPartInfo.leader);
+            }
+        }
+    }
+
+    public void replay(PartitionChangeRecord record) {
+        TopicControlInfo topicInfo = topics.get(record.topicId());
+        if (topicInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no topic with that ID was 
found.");
+        }
+        PartitionControlInfo prevPartitionInfo = 
topicInfo.parts.get(record.partitionId());
+        if (prevPartitionInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no partition with that id 
was found.");
+        }
+        PartitionControlInfo newPartitionInfo = 
prevPartitionInfo.merge(record);
+        topicInfo.parts.put(record.partitionId(), newPartitionInfo);
+        brokersToIsrs.update(record.topicId(), record.partitionId(),
+            prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
+            newPartitionInfo.leader);
+        log.debug("Applied ISR change record: {}", record.toString());
+    }
+
+    ControllerResult<CreateTopicsResponseData>
+            createTopics(CreateTopicsRequestData request) {
+        Map<String, ApiError> topicErrors = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        // Check the topic names.
+        validateNewTopicNames(topicErrors, request.topics());
+
+        // Identify topics that already exist and mark them with the 
appropriate error
+        request.topics().stream().filter(creatableTopic -> 
topicsByName.containsKey(creatableTopic.name()))
+                .forEach(t -> topicErrors.put(t.name(), new 
ApiError(Errors.TOPIC_ALREADY_EXISTS)));
+
+        // Verify that the configurations for the new topics are OK, and 
figure out what
+        // ConfigRecords should be created.
+        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
+            computeConfigChanges(topicErrors, request.topics());
+        ControllerResult<Map<ConfigResource, ApiError>> configResult =
+            configurationControl.incrementalAlterConfigs(configChanges);
+        for (Entry<ConfigResource, ApiError> entry : 
configResult.response().entrySet()) {
+            if (entry.getValue().isFailure()) {
+                topicErrors.put(entry.getKey().name(), entry.getValue());
+            }
+        }
+        records.addAll(configResult.records());
+
+        // Try to create whatever topics are needed.
+        Map<String, CreatableTopicResult> successes = new HashMap<>();
+        for (CreatableTopic topic : request.topics()) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            ApiError error = createTopic(topic, records, successes);
+            if (error.isFailure()) {
+                topicErrors.put(topic.name(), error);
+            }
+        }
+
+        // Create responses for all topics.
+        CreateTopicsResponseData data = new CreateTopicsResponseData();
+        StringBuilder resultsBuilder = new StringBuilder();
+        String resultsPrefix = "";
+        for (CreatableTopic topic : request.topics()) {
+            ApiError error = topicErrors.get(topic.name());
+            if (error != null) {
+                data.topics().add(new CreatableTopicResult().
+                    setName(topic.name()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+                resultsBuilder.append(resultsPrefix).append(topic).append(": 
").
+                    append(error.error()).append(" 
(").append(error.message()).append(")");
+                resultsPrefix = ", ";
+                continue;
+            }
+            CreatableTopicResult result = successes.get(topic.name());
+            data.topics().add(result);
+            resultsBuilder.append(resultsPrefix).append(topic).append(": ").
+                append("SUCCESS");
+            resultsPrefix = ", ";
+        }
+        log.info("createTopics result(s): {}", resultsBuilder.toString());
+        return new ControllerResult<>(records, data);
+    }
+
+    private ApiError createTopic(CreatableTopic topic,
+                                 List<ApiMessageAndVersion> records,
+                                 Map<String, CreatableTopicResult> successes) {
+        Map<Integer, PartitionControlInfo> newParts = new HashMap<>();
+        if (!topic.assignments().isEmpty()) {
+            if (topic.replicationFactor() != -1) {
+                return new ApiError(Errors.INVALID_REQUEST,
+                    "A manual partition assignment was specified, but 
replication " +
+                    "factor was not set to -1.");
+            }
+            if (topic.numPartitions() != -1) {
+                return new ApiError(Errors.INVALID_REQUEST,
+                    "A manual partition assignment was specified, but 
numPartitions " +
+                        "was not set to -1.");
+            }
+            for (CreatableReplicaAssignment assignment : topic.assignments()) {
+                if (newParts.containsKey(assignment.partitionIndex())) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                        "Found multiple manual partition assignments for 
partition " +
+                            assignment.partitionIndex());
+                }
+                HashSet<Integer> brokerIds = new HashSet<>();
+                for (int brokerId : assignment.brokerIds()) {
+                    if (!brokerIds.add(brokerId)) {
+                        return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "The manual partition assignment specifies the 
same node " +
+                                "id more than once.");
+                    } else if (!clusterControl.unfenced(brokerId)) {
+                        return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "The manual partition assignment contains node " + 
brokerId +
+                                ", but that node is not usable.");
+                    }
+                }
+                int[] replicas = new int[assignment.brokerIds().size()];
+                for (int i = 0; i < replicas.length; i++) {
+                    replicas[i] = assignment.brokerIds().get(i);
+                }
+                int[] isr = new int[assignment.brokerIds().size()];
+                for (int i = 0; i < replicas.length; i++) {
+                    isr[i] = assignment.brokerIds().get(i);
+                }
+                newParts.put(assignment.partitionIndex(),
+                    new PartitionControlInfo(replicas, isr, null, null, 
isr[0], 0, 0));
+            }
+        } else if (topic.replicationFactor() < -1 || topic.replicationFactor() 
== 0) {
+            return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
+                "Replication factor was set to an invalid non-positive 
value.");
+        } else if (!topic.assignments().isEmpty()) {
+            return new ApiError(Errors.INVALID_REQUEST,
+                "Replication factor was not set to -1 but a manual partition " 
+
+                    "assignment was specified.");
+        } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
+            return new ApiError(Errors.INVALID_PARTITIONS,
+                "Number of partitions was set to an invalid non-positive 
value.");
+        } else {
+            int numPartitions = topic.numPartitions() == -1 ?
+                defaultNumPartitions : topic.numPartitions();
+            short replicationFactor = topic.replicationFactor() == -1 ?
+                defaultReplicationFactor : topic.replicationFactor();
+            try {
+                List<List<Integer>> replicas = clusterControl.
+                    placeReplicas(numPartitions, replicationFactor);
+                for (int partitionId = 0; partitionId < replicas.size(); 
partitionId++) {
+                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                    newParts.put(partitionId,
+                        new PartitionControlInfo(r, r, null, null, r[0], 0, 
0));
+                }
+            } catch (InvalidReplicationFactorException e) {
+                return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
+                    "Unable to replicate the partition " + replicationFactor +
+                        " times: " + e.getMessage());
+            }
+        }
+        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        successes.put(topic.name(), new CreatableTopicResult().
+            setName(topic.name()).
+            setTopicId(topicId).
+            setErrorCode((short) 0).
+            setErrorMessage(null).
+            setNumPartitions(newParts.size()).
+            setReplicationFactor((short) newParts.get(0).replicas.length));
+        records.add(new ApiMessageAndVersion(new TopicRecord().
+            setName(topic.name()).
+            setTopicId(topicId), (short) 0));
+        for (Entry<Integer, PartitionControlInfo> partEntry : 
newParts.entrySet()) {
+            int partitionIndex = partEntry.getKey();
+            PartitionControlInfo info = partEntry.getValue();
+            records.add(new ApiMessageAndVersion(new PartitionRecord().
+                setPartitionId(partitionIndex).
+                setTopicId(topicId).
+                setReplicas(Replicas.toList(info.replicas)).
+                setIsr(Replicas.toList(info.isr)).
+                setRemovingReplicas(null).
+                setAddingReplicas(null).
+                setLeader(info.leader).
+                setLeaderEpoch(info.leaderEpoch).
+                setPartitionEpoch(0), (short) 0));
+        }
+        return ApiError.NONE;
+    }
+
+    static void validateNewTopicNames(Map<String, ApiError> topicErrors,
+                                      CreatableTopicCollection topics) {
+        for (CreatableTopic topic : topics) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            try {
+                Topic.validate(topic.name());
+            } catch (InvalidTopicException e) {
+                topicErrors.put(topic.name(),
+                    new ApiError(Errors.INVALID_TOPIC_EXCEPTION, 
e.getMessage()));
+            }
+        }
+    }
+
+    static Map<ConfigResource, Map<String, Entry<OpType, String>>>
+            computeConfigChanges(Map<String, ApiError> topicErrors,
+                                 CreatableTopicCollection topics) {
+        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges 
= new HashMap<>();
+        for (CreatableTopic topic : topics) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>();
+            for (CreateTopicsRequestData.CreateableTopicConfig config : 
topic.configs()) {
+                topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+            }
+            if (!topicConfigs.isEmpty()) {
+                configChanges.put(new ConfigResource(TOPIC, topic.name()), 
topicConfigs);
+            }
+        }
+        return configChanges;
+    }
+
+    // VisibleForTesting
+    PartitionControlInfo getPartition(Uuid topicId, int partitionId) {
+        TopicControlInfo topic = topics.get(topicId);
+        if (topic == null) {
+            return null;
+        }
+        return topic.parts.get(partitionId);
+    }
+
+    // VisibleForTesting
+    BrokersToIsrs brokersToIsrs() {
+        return brokersToIsrs;
+    }
+
+    ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData 
request) {
+        clusterControl.checkBrokerEpoch(request.brokerId(), 
request.brokerEpoch());
+        AlterIsrResponseData response = new AlterIsrResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (AlterIsrRequestData.TopicData topicData : request.topics()) {
+            AlterIsrResponseData.TopicData responseTopicData =
+                new AlterIsrResponseData.TopicData().setName(topicData.name());
+            response.topics().add(responseTopicData);
+            Uuid topicId = topicsByName.get(topicData.name());
+            if (topicId == null || !topics.containsKey(topicId)) {
+                for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                }
+                continue;
+            }
+            TopicControlInfo topic = topics.get(topicId);
+            for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
+                PartitionControlInfo partition = 
topic.parts.get(partitionData.partitionIndex());
+                if (partition == null) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                    continue;
+                }
+                if (partitionData.leaderEpoch() != partition.leaderEpoch) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+                    continue;
+                }
+                if (partitionData.currentIsrVersion() != 
partition.partitionEpoch) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
+                    continue;
+                }
+                int[] newIsr = Replicas.toArray(partitionData.newIsr());
+                if (!Replicas.validateIsr(partition.replicas, newIsr)) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                }
+                if (!Replicas.contains(newIsr, partition.leader)) {
+                    // An alterIsr request can't remove the current leader.
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                }
+                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                    setPartitionId(partitionData.partitionIndex()).
+                    setTopicId(topic.id).
+                    setIsr(partitionData.newIsr()).
+                    setLeader(partition.leader).
+                    setLeaderEpoch(partition.leaderEpoch).
+                    setPartitionEpoch(partition.partitionEpoch + 1), (short) 
0));
+            }
+        }
+        return new ControllerResult<>(records, response);
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker being fenced.
+     *
+     * First, we remove this broker from any non-singleton ISR. Then we 
generate a
+     * FenceBrokerRecord.
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+
+    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+        if (brokerRegistration == null) {
+            throw new RuntimeException("Can't find broker registration for 
broker " + brokerId);
+        }
+        handleNodeDeactivated(brokerId, records);
+        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+            setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker being unregistered.
+     *
+     * First, we remove this broker from any non-singleton ISR. Then we 
generate an
+     * UnregisterBrokerRecord.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerUnregistered(int brokerId, long brokerEpoch,
+                                  List<ApiMessageAndVersion> records) {
+        handleNodeDeactivated(brokerId, records);
+        records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
+            setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0));
+    }
+
+    /**
+     * Handle a broker being deactivated. This means we remove it from any ISR 
that has
+     * more than one element. We do not remove the broker from ISRs where it 
is the only
+     * member since this would preclude clean leader election in the future.
+     * It is removed as the leader for all partitions it leads.
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+    void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> 
records) {
+        Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, 
false);
+        while (iterator.hasNext()) {
+            TopicPartition topicPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + 
topicPartition.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicPartition +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
+            int newLeader, newLeaderEpoch;
+            if (newIsr.length == 0) {
+                // We don't want to shrink the ISR to size 0. So, leave the 
node in the
+                // ISR, but set the leader to -1 (no leader).
+                newIsr = partition.isr;
+                newLeader = -1;
+                newLeaderEpoch = partition.leaderEpoch + 1;
+            } else if (partition.leader == brokerId) {
+                // The fenced node will no longer be the leader.
+                newLeader = chooseNewLeader(partition, newIsr, false);
+                newLeaderEpoch = partition.leaderEpoch + 1;
+            } else {
+                // The fenced node wasn't the leader, so no leader change is 
needed.
+                newLeader = partition.leader;
+                newLeaderEpoch = partition.leaderEpoch;
+            }
+            records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+                setPartitionId(topicPartition.partitionId()).
+                setTopicId(topic.id).
+                setIsr(Replicas.toList(newIsr)).
+                setLeader(newLeader).
+                setLeaderEpoch(newLeaderEpoch).
+                setPartitionEpoch(partition.partitionEpoch + 1), (short) 0));
+        }
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker becoming unfenced.
+     *
+     * First, we create an UnfenceBrokerRecord. Then, we check if if there are 
any
+     * partitions that don't currently have a leader that should be led by the 
newly
+     * unfenced broker.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVersion> records) {
+        records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
+            setId(brokerId).setEpoch(brokerEpoch), (short) 0));
+        handleNodeActivated(brokerId, records);
+    }
+
+    /**
+     * Handle a broker being activated. This means we check if it can become 
the leader
+     * for any partition that currently has no leader (aka offline partition).
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+    void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) 
{
+        Iterator<TopicPartition> iterator = brokersToIsrs.noLeaderIterator();
+        while (iterator.hasNext()) {
+            TopicPartition topicPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + 
topicPartition.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicPartition +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            // TODO: if this partition is configured for unclean leader 
election,
+            // check the replica set rather than the ISR.
+            if (Replicas.contains(partition.isr, brokerId)) {
+                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                    setPartitionId(topicPartition.partitionId()).
+                    setTopicId(topic.id).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeader(brokerId).
+                    setLeaderEpoch(partition.leaderEpoch + 1).
+                    setPartitionEpoch(partition.partitionEpoch + 1), (short) 
0));
+            }
+        }
+    }
+
+    ControllerResult<ElectLeadersResponseData> 
electLeaders(ElectLeadersRequestData request) {
+        boolean unclean = electionIsUnclean(request.electionType());
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ElectLeadersResponseData response = new ElectLeadersResponseData();
+        for (TopicPartitions topic : request.topicPartitions()) {
+            ReplicaElectionResult topicResults =
+                new ReplicaElectionResult().setTopic(topic.topic());
+            response.replicaElectionResults().add(topicResults);
+            for (int partitionId : topic.partitions()) {
+                ApiError error = electLeader(topic.topic(), partitionId, 
unclean, records);
+                topicResults.partitionResult().add(new PartitionResult().
+                    setPartitionId(partitionId).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+            }
+        }
+        return new ControllerResult<>(records, response);
+    }
+
+    static boolean electionIsUnclean(byte electionType) {
+        ElectionType type;
+        try {
+            type = ElectionType.valueOf(electionType);
+        } catch (IllegalArgumentException e) {
+            throw new InvalidRequestException("Unknown election type " + (int) 
electionType);
+        }
+        return type == ElectionType.UNCLEAN;
+    }
+
+    ApiError electLeader(String topic, int partitionId, boolean unclean,
+                         List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topic);
+        if (topicId == null) {
+            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                "No such topic as " + topic);
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                "No such topic id as " + topicId);
+        }
+        PartitionControlInfo partitionInfo = topicInfo.parts.get(partitionId);
+        if (partitionInfo == null) {
+            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                "No such partition as " + topic + "-" + partitionId);
+        }
+        int newLeader = chooseNewLeader(partitionInfo, partitionInfo.isr, 
unclean);
+        if (newLeader == partitionInfo.leader) {
+            if (newLeader < 0) {
+                return new ApiError(Errors.LEADER_NOT_AVAILABLE,
+                    "Unable to find any leader for the partition.");
+            } else {
+                return ApiError.NONE;
+            }
+        } else {
+            int[] newIsr = partitionInfo.isr;
+            if (!Replicas.contains(partitionInfo.isr, newLeader)) {
+                newIsr = new int[] {newLeader};
+            }
+            records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+                setPartitionId(partitionId).
+                setTopicId(topicId).
+                setIsr(Replicas.toList(newIsr)).
+                setLeader(newLeader).
+                setLeaderEpoch(partitionInfo.leaderEpoch + 1).
+                setPartitionEpoch(partitionInfo.partitionEpoch + 1), (short) 
0));
+            return ApiError.NONE;
+        }
+    }
+
+    int chooseNewLeader(PartitionControlInfo partition, int[] newIsr, boolean 
unclean) {
+        for (int i = 0; i < partition.replicas.length; i++) {
+            int replica = partition.replicas[i];
+            if (Replicas.contains(newIsr, replica)) {
+                return replica;
+            }
+        }
+        if (unclean) {
+            for (int i = 0; i < partition.replicas.length; i++) {
+                int replica = partition.replicas[i];
+                if (clusterControl.unfenced(replica)) {
+                    return replica;
+                }
+            }
+        }
+        return -1;
+    }
+
+    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
+                BrokerHeartbeatRequestData request, long lastCommittedOffset) {
+        int brokerId = request.brokerId();
+        long brokerEpoch = request.brokerEpoch();
+        clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
+        BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
+        BrokerControlStates states = 
heartbeatManager.calculateNextBrokerState(brokerId,
+            request, lastCommittedOffset, () -> 
brokersToIsrs.hasLeaderships(brokerId));
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        if (states.current() != states.next()) {
+            switch (states.next()) {
+                case FENCED:
+                    handleBrokerFenced(brokerId, records);
+                    break;
+                case UNFENCED:
+                    handleBrokerUnfenced(brokerId, brokerEpoch, records);
+                    break;
+                case CONTROLLED_SHUTDOWN:
+                    handleNodeDeactivated(brokerId, records);

Review comment:
       > It seems like the remaining behavioral difference is that the new code 
will, if no other leader can be chosen, set the leader to -1 (offline). If we 
don't do this, controlled shutdown easily gets stuck if there are any 
partitions with replication factor = 1. Maybe we can tune this a bit later?
   
   It's fine to revisit that later. The tradeoff is that if we wait, it 
slightly increases the probability of availability since another replica could 
join isr.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+
+
+/**
+ * The ReplicationControlManager is the part of the controller which deals 
with topics
+ * and partitions. It is responsible for managing the in-sync replica set and 
leader
+ * of each partition, as well as administrative tasks like creating or 
deleting topics.
+ */
+public class ReplicationControlManager {
+    static class TopicControlInfo {
+        private final Uuid id;
+        private final TimelineHashMap<Integer, PartitionControlInfo> parts;
+
+        TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+            this.id = id;
+            this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
+        }
+    }
+
+    static class PartitionControlInfo {
+        private final int[] replicas;
+        private final int[] isr;
+        private final int[] removingReplicas;
+        private final int[] addingReplicas;
+        private final int leader;
+        private final int leaderEpoch;
+        private final int partitionEpoch;
+
+        PartitionControlInfo(PartitionRecord record) {
+            this(Replicas.toArray(record.replicas()),
+                Replicas.toArray(record.isr()),
+                Replicas.toArray(record.removingReplicas()),
+                Replicas.toArray(record.addingReplicas()),
+                record.leader(),
+                record.leaderEpoch(),
+                record.partitionEpoch());
+        }
+
+        PartitionControlInfo(int[] replicas, int[] isr, int[] removingReplicas,
+                             int[] addingReplicas, int leader, int leaderEpoch,
+                             int partitionEpoch) {
+            this.replicas = replicas;
+            this.isr = isr;
+            this.removingReplicas = removingReplicas;
+            this.addingReplicas = addingReplicas;
+            this.leader = leader;
+            this.leaderEpoch = leaderEpoch;
+            this.partitionEpoch = partitionEpoch;
+        }
+
+        PartitionControlInfo merge(PartitionChangeRecord record) {
+            return new PartitionControlInfo(replicas,
+                Replicas.toArray(record.isr()),
+                removingReplicas,
+                addingReplicas,
+                record.leader(),
+                record.leaderEpoch(),
+                record.partitionEpoch());
+        }
+
+        String diff(PartitionControlInfo prev) {
+            StringBuilder builder = new StringBuilder();
+            String prefix = "";
+            if (!Arrays.equals(replicas, prev.replicas)) {
+                
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+                prefix = ", ";
+                
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
+            }
+            if (!Arrays.equals(isr, prev.isr)) {
+                
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+                prefix = ", ";
+                
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
+            }
+            if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
+                builder.append(prefix).append("oldRemovingReplicas=").
+                    append(Arrays.toString(prev.removingReplicas));
+                prefix = ", ";
+                builder.append(prefix).append("newRemovingReplicas=").
+                    append(Arrays.toString(removingReplicas));
+            }
+            if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
+                builder.append(prefix).append("oldAddingReplicas=").
+                    append(Arrays.toString(prev.addingReplicas));
+                prefix = ", ";
+                builder.append(prefix).append("newAddingReplicas=").
+                    append(Arrays.toString(addingReplicas));
+            }
+            if (leader != prev.leader) {
+                
builder.append(prefix).append("oldLeader=").append(prev.leader);
+                prefix = ", ";
+                builder.append(prefix).append("newLeader=").append(leader);
+            }
+            if (leaderEpoch != prev.leaderEpoch) {
+                
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+                prefix = ", ";
+                
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
+            }
+            if (partitionEpoch != prev.partitionEpoch) {
+                
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
+                prefix = ", ";
+                
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+            }
+            return builder.toString();
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(replicas, isr, removingReplicas, 
addingReplicas, leader,
+                leaderEpoch, partitionEpoch);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof PartitionControlInfo)) return false;
+            PartitionControlInfo other = (PartitionControlInfo) o;
+            return diff(other).isEmpty();
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder("PartitionControlInfo(");
+            builder.append("replicas=").append(Arrays.toString(replicas));
+            builder.append(", isr=").append(Arrays.toString(isr));
+            builder.append(", 
removingReplicas=").append(Arrays.toString(removingReplicas));
+            builder.append(", 
addingReplicas=").append(Arrays.toString(addingReplicas));
+            builder.append(", leader=").append(leader);
+            builder.append(", leaderEpoch=").append(leaderEpoch);
+            builder.append(", partitionEpoch=").append(partitionEpoch);
+            builder.append(")");
+            return builder.toString();
+        }
+    }
+
+    private final SnapshotRegistry snapshotRegistry;
+    private final Logger log;
+
+    /**
+     * The random number generator used by this object.
+     */
+    private final Random random;
+
+    /**
+     * The KIP-464 default replication factor that is used if a CreateTopics 
request does
+     * not specify one.
+     */
+    private final short defaultReplicationFactor;
+
+    /**
+     * The KIP-464 default number of partitions that is used if a CreateTopics 
request does
+     * not specify a number of partitions.
+     */
+    private final int defaultNumPartitions;
+
+    /**
+     * A reference to the controller's configuration control manager.
+     */
+    private final ConfigurationControlManager configurationControl;
+
+    /**
+     * A reference to the controller's cluster control manager.
+     */
+    final ClusterControlManager clusterControl;
+
+    /**
+     * Maps topic names to topic UUIDs.
+     */
+    private final TimelineHashMap<String, Uuid> topicsByName;
+
+    /**
+     * Maps topic UUIDs to structures containing topic information, including 
partitions.
+     */
+    private final TimelineHashMap<Uuid, TopicControlInfo> topics;
+
+    /**
+     * A map of broker IDs to the partitions that the broker is in the ISR for.
+     */
+    private final BrokersToIsrs brokersToIsrs;
+
+    ReplicationControlManager(SnapshotRegistry snapshotRegistry,
+                              LogContext logContext,
+                              Random random,
+                              short defaultReplicationFactor,
+                              int defaultNumPartitions,
+                              ConfigurationControlManager configurationControl,
+                              ClusterControlManager clusterControl) {
+        this.snapshotRegistry = snapshotRegistry;
+        this.log = logContext.logger(ReplicationControlManager.class);
+        this.random = random;
+        this.defaultReplicationFactor = defaultReplicationFactor;
+        this.defaultNumPartitions = defaultNumPartitions;
+        this.configurationControl = configurationControl;
+        this.clusterControl = clusterControl;
+        this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
+    }
+
+    public void replay(TopicRecord record) {
+        topicsByName.put(record.name(), record.topicId());
+        topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, 
record.topicId()));
+        log.info("Created topic {} with ID {}.", record.name(), 
record.topicId());
+    }
+
+    public void replay(PartitionRecord record) {
+        TopicControlInfo topicInfo = topics.get(record.topicId());
+        if (topicInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no topic with that ID was 
found.");
+        }
+        PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
+        PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+        if (prevPartInfo == null) {
+            log.info("Created partition {}:{} with {}.", record.topicId(),
+                record.partitionId(), newPartInfo.toString());
+            topicInfo.parts.put(record.partitionId(), newPartInfo);
+            brokersToIsrs.update(record.topicId(), record.partitionId(), null,
+                newPartInfo.isr, -1, newPartInfo.leader);
+        } else {
+            String diff = newPartInfo.diff(prevPartInfo);
+            if (!diff.isEmpty()) {
+                log.info("Modified partition {}:{}: {}.", record.topicId(),
+                    record.partitionId(), diff);
+                topicInfo.parts.put(record.partitionId(), newPartInfo);
+                brokersToIsrs.update(record.topicId(), record.partitionId(),
+                    prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader,
+                    newPartInfo.leader);
+            }
+        }
+    }
+
+    public void replay(PartitionChangeRecord record) {
+        TopicControlInfo topicInfo = topics.get(record.topicId());
+        if (topicInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no topic with that ID was 
found.");
+        }
+        PartitionControlInfo prevPartitionInfo = 
topicInfo.parts.get(record.partitionId());
+        if (prevPartitionInfo == null) {
+            throw new RuntimeException("Tried to create partition " + 
record.topicId() +
+                ":" + record.partitionId() + ", but no partition with that id 
was found.");
+        }
+        PartitionControlInfo newPartitionInfo = 
prevPartitionInfo.merge(record);
+        topicInfo.parts.put(record.partitionId(), newPartitionInfo);
+        brokersToIsrs.update(record.topicId(), record.partitionId(),
+            prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
+            newPartitionInfo.leader);
+        log.debug("Applied ISR change record: {}", record.toString());
+    }
+
+    ControllerResult<CreateTopicsResponseData>
+            createTopics(CreateTopicsRequestData request) {
+        Map<String, ApiError> topicErrors = new HashMap<>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        // Check the topic names.
+        validateNewTopicNames(topicErrors, request.topics());
+
+        // Identify topics that already exist and mark them with the 
appropriate error
+        request.topics().stream().filter(creatableTopic -> 
topicsByName.containsKey(creatableTopic.name()))
+                .forEach(t -> topicErrors.put(t.name(), new 
ApiError(Errors.TOPIC_ALREADY_EXISTS)));
+
+        // Verify that the configurations for the new topics are OK, and 
figure out what
+        // ConfigRecords should be created.
+        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges =
+            computeConfigChanges(topicErrors, request.topics());
+        ControllerResult<Map<ConfigResource, ApiError>> configResult =
+            configurationControl.incrementalAlterConfigs(configChanges);
+        for (Entry<ConfigResource, ApiError> entry : 
configResult.response().entrySet()) {
+            if (entry.getValue().isFailure()) {
+                topicErrors.put(entry.getKey().name(), entry.getValue());
+            }
+        }
+        records.addAll(configResult.records());
+
+        // Try to create whatever topics are needed.
+        Map<String, CreatableTopicResult> successes = new HashMap<>();
+        for (CreatableTopic topic : request.topics()) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            ApiError error = createTopic(topic, records, successes);
+            if (error.isFailure()) {
+                topicErrors.put(topic.name(), error);
+            }
+        }
+
+        // Create responses for all topics.
+        CreateTopicsResponseData data = new CreateTopicsResponseData();
+        StringBuilder resultsBuilder = new StringBuilder();
+        String resultsPrefix = "";
+        for (CreatableTopic topic : request.topics()) {
+            ApiError error = topicErrors.get(topic.name());
+            if (error != null) {
+                data.topics().add(new CreatableTopicResult().
+                    setName(topic.name()).
+                    setErrorCode(error.error().code()).
+                    setErrorMessage(error.message()));
+                resultsBuilder.append(resultsPrefix).append(topic).append(": 
").
+                    append(error.error()).append(" 
(").append(error.message()).append(")");
+                resultsPrefix = ", ";
+                continue;
+            }
+            CreatableTopicResult result = successes.get(topic.name());
+            data.topics().add(result);
+            resultsBuilder.append(resultsPrefix).append(topic).append(": ").
+                append("SUCCESS");
+            resultsPrefix = ", ";
+        }
+        log.info("createTopics result(s): {}", resultsBuilder.toString());
+        return new ControllerResult<>(records, data);
+    }
+
+    private ApiError createTopic(CreatableTopic topic,
+                                 List<ApiMessageAndVersion> records,
+                                 Map<String, CreatableTopicResult> successes) {
+        Map<Integer, PartitionControlInfo> newParts = new HashMap<>();
+        if (!topic.assignments().isEmpty()) {
+            if (topic.replicationFactor() != -1) {
+                return new ApiError(Errors.INVALID_REQUEST,
+                    "A manual partition assignment was specified, but 
replication " +
+                    "factor was not set to -1.");
+            }
+            if (topic.numPartitions() != -1) {
+                return new ApiError(Errors.INVALID_REQUEST,
+                    "A manual partition assignment was specified, but 
numPartitions " +
+                        "was not set to -1.");
+            }
+            for (CreatableReplicaAssignment assignment : topic.assignments()) {
+                if (newParts.containsKey(assignment.partitionIndex())) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                        "Found multiple manual partition assignments for 
partition " +
+                            assignment.partitionIndex());
+                }
+                HashSet<Integer> brokerIds = new HashSet<>();
+                for (int brokerId : assignment.brokerIds()) {
+                    if (!brokerIds.add(brokerId)) {
+                        return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "The manual partition assignment specifies the 
same node " +
+                                "id more than once.");
+                    } else if (!clusterControl.unfenced(brokerId)) {
+                        return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                            "The manual partition assignment contains node " + 
brokerId +
+                                ", but that node is not usable.");
+                    }
+                }
+                int[] replicas = new int[assignment.brokerIds().size()];
+                for (int i = 0; i < replicas.length; i++) {
+                    replicas[i] = assignment.brokerIds().get(i);
+                }
+                int[] isr = new int[assignment.brokerIds().size()];
+                for (int i = 0; i < replicas.length; i++) {
+                    isr[i] = assignment.brokerIds().get(i);
+                }
+                newParts.put(assignment.partitionIndex(),
+                    new PartitionControlInfo(replicas, isr, null, null, 
isr[0], 0, 0));
+            }
+        } else if (topic.replicationFactor() < -1 || topic.replicationFactor() 
== 0) {
+            return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
+                "Replication factor was set to an invalid non-positive 
value.");
+        } else if (!topic.assignments().isEmpty()) {
+            return new ApiError(Errors.INVALID_REQUEST,
+                "Replication factor was not set to -1 but a manual partition " 
+
+                    "assignment was specified.");
+        } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
+            return new ApiError(Errors.INVALID_PARTITIONS,
+                "Number of partitions was set to an invalid non-positive 
value.");
+        } else {
+            int numPartitions = topic.numPartitions() == -1 ?
+                defaultNumPartitions : topic.numPartitions();
+            short replicationFactor = topic.replicationFactor() == -1 ?
+                defaultReplicationFactor : topic.replicationFactor();
+            try {
+                List<List<Integer>> replicas = clusterControl.
+                    placeReplicas(numPartitions, replicationFactor);
+                for (int partitionId = 0; partitionId < replicas.size(); 
partitionId++) {
+                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                    newParts.put(partitionId,
+                        new PartitionControlInfo(r, r, null, null, r[0], 0, 
0));
+                }
+            } catch (InvalidReplicationFactorException e) {
+                return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
+                    "Unable to replicate the partition " + replicationFactor +
+                        " times: " + e.getMessage());
+            }
+        }
+        Uuid topicId = new Uuid(random.nextLong(), random.nextLong());
+        successes.put(topic.name(), new CreatableTopicResult().
+            setName(topic.name()).
+            setTopicId(topicId).
+            setErrorCode((short) 0).
+            setErrorMessage(null).
+            setNumPartitions(newParts.size()).
+            setReplicationFactor((short) newParts.get(0).replicas.length));
+        records.add(new ApiMessageAndVersion(new TopicRecord().
+            setName(topic.name()).
+            setTopicId(topicId), (short) 0));
+        for (Entry<Integer, PartitionControlInfo> partEntry : 
newParts.entrySet()) {
+            int partitionIndex = partEntry.getKey();
+            PartitionControlInfo info = partEntry.getValue();
+            records.add(new ApiMessageAndVersion(new PartitionRecord().
+                setPartitionId(partitionIndex).
+                setTopicId(topicId).
+                setReplicas(Replicas.toList(info.replicas)).
+                setIsr(Replicas.toList(info.isr)).
+                setRemovingReplicas(null).
+                setAddingReplicas(null).
+                setLeader(info.leader).
+                setLeaderEpoch(info.leaderEpoch).
+                setPartitionEpoch(0), (short) 0));
+        }
+        return ApiError.NONE;
+    }
+
+    static void validateNewTopicNames(Map<String, ApiError> topicErrors,
+                                      CreatableTopicCollection topics) {
+        for (CreatableTopic topic : topics) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            try {
+                Topic.validate(topic.name());
+            } catch (InvalidTopicException e) {
+                topicErrors.put(topic.name(),
+                    new ApiError(Errors.INVALID_TOPIC_EXCEPTION, 
e.getMessage()));
+            }
+        }
+    }
+
+    static Map<ConfigResource, Map<String, Entry<OpType, String>>>
+            computeConfigChanges(Map<String, ApiError> topicErrors,
+                                 CreatableTopicCollection topics) {
+        Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges 
= new HashMap<>();
+        for (CreatableTopic topic : topics) {
+            if (topicErrors.containsKey(topic.name())) continue;
+            Map<String, Entry<OpType, String>> topicConfigs = new HashMap<>();
+            for (CreateTopicsRequestData.CreateableTopicConfig config : 
topic.configs()) {
+                topicConfigs.put(config.name(), new 
SimpleImmutableEntry<>(SET, config.value()));
+            }
+            if (!topicConfigs.isEmpty()) {
+                configChanges.put(new ConfigResource(TOPIC, topic.name()), 
topicConfigs);
+            }
+        }
+        return configChanges;
+    }
+
+    // VisibleForTesting
+    PartitionControlInfo getPartition(Uuid topicId, int partitionId) {
+        TopicControlInfo topic = topics.get(topicId);
+        if (topic == null) {
+            return null;
+        }
+        return topic.parts.get(partitionId);
+    }
+
+    // VisibleForTesting
+    BrokersToIsrs brokersToIsrs() {
+        return brokersToIsrs;
+    }
+
+    ControllerResult<AlterIsrResponseData> alterIsr(AlterIsrRequestData 
request) {
+        clusterControl.checkBrokerEpoch(request.brokerId(), 
request.brokerEpoch());
+        AlterIsrResponseData response = new AlterIsrResponseData();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (AlterIsrRequestData.TopicData topicData : request.topics()) {
+            AlterIsrResponseData.TopicData responseTopicData =
+                new AlterIsrResponseData.TopicData().setName(topicData.name());
+            response.topics().add(responseTopicData);
+            Uuid topicId = topicsByName.get(topicData.name());
+            if (topicId == null || !topics.containsKey(topicId)) {
+                for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                }
+                continue;
+            }
+            TopicControlInfo topic = topics.get(topicId);
+            for (AlterIsrRequestData.PartitionData partitionData : 
topicData.partitions()) {
+                PartitionControlInfo partition = 
topic.parts.get(partitionData.partitionIndex());
+                if (partition == null) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                    continue;
+                }
+                if (partitionData.leaderEpoch() != partition.leaderEpoch) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+                    continue;
+                }
+                if (partitionData.currentIsrVersion() != 
partition.partitionEpoch) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
+                    continue;
+                }
+                int[] newIsr = Replicas.toArray(partitionData.newIsr());
+                if (!Replicas.validateIsr(partition.replicas, newIsr)) {
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                }
+                if (!Replicas.contains(newIsr, partition.leader)) {
+                    // An alterIsr request can't remove the current leader.
+                    responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                        setPartitionIndex(partitionData.partitionIndex()).
+                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                }
+                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                    setPartitionId(partitionData.partitionIndex()).
+                    setTopicId(topic.id).
+                    setIsr(partitionData.newIsr()).
+                    setLeader(partition.leader).
+                    setLeaderEpoch(partition.leaderEpoch).
+                    setPartitionEpoch(partition.partitionEpoch + 1), (short) 
0));
+            }
+        }
+        return new ControllerResult<>(records, response);
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker being fenced.
+     *
+     * First, we remove this broker from any non-singleton ISR. Then we 
generate a
+     * FenceBrokerRecord.
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+
+    void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+        if (brokerRegistration == null) {
+            throw new RuntimeException("Can't find broker registration for 
broker " + brokerId);
+        }
+        handleNodeDeactivated(brokerId, records);
+        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+            setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker being unregistered.
+     *
+     * First, we remove this broker from any non-singleton ISR. Then we 
generate a
+     * FenceBrokerRecord.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerUnregistered(int brokerId, long brokerEpoch,
+                                  List<ApiMessageAndVersion> records) {
+        handleNodeDeactivated(brokerId, records);
+        records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
+            setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0));
+    }
+
+    /**
+     * Handle a broker being deactivated. This means we remove it from any ISR 
that has
+     * more than one element. We do not remove the broker from ISRs where it 
is the only
+     * member since this would preclude clean leader election in the future.
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+    void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> 
records) {
+        Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, 
false);
+        while (iterator.hasNext()) {
+            TopicPartition topicPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + 
topicPartition.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicPartition +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
+            int newLeader, newLeaderEpoch;
+            if (newIsr.length == 0) {
+                // We don't want to shrink the ISR to size 0. So, leave the 
node in the
+                // ISR, but set the leader to -1 (no leader).
+                newIsr = partition.isr;
+                newLeader = -1;
+                newLeaderEpoch = partition.leaderEpoch + 1;
+            } else if (partition.leader == brokerId) {
+                // The fenced node will no longer be the leader.
+                newLeader = chooseNewLeader(partition, newIsr, false);
+                newLeaderEpoch = partition.leaderEpoch + 1;
+            } else {
+                // The fenced node wasn't the leader, so no leader change is 
needed.
+                newLeader = partition.leader;
+                newLeaderEpoch = partition.leaderEpoch;
+            }
+            records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+                setPartitionId(topicPartition.partitionId()).
+                setTopicId(topic.id).
+                setIsr(Replicas.toList(newIsr)).
+                setLeader(newLeader).
+                setLeaderEpoch(newLeaderEpoch).
+                setPartitionEpoch(partition.partitionEpoch + 1), (short) 0));
+        }
+    }
+
+    /**
+     * Generate the appropriate records to handle a broker becoming unfenced.
+     *
+     * First, we create an UnfenceBrokerRecord. Then, we check if if there are 
any
+     * partitions that don't currently have a leader that should be led by the 
newly
+     * unfenced broker.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVersion> records) {
+        records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
+            setId(brokerId).setEpoch(brokerEpoch), (short) 0));
+        handleNodeActivated(brokerId, records);
+    }
+
+    /**
+     * Handle a broker being activated. This means we check if it can become 
the leader
+     * for any partition that currently has no leader (aka offline partition).
+     *
+     * @param brokerId      The broker id.
+     * @param records       The record list to append to.
+     */
+    void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) 
{
+        Iterator<TopicPartition> iterator = brokersToIsrs.noLeaderIterator();
+        while (iterator.hasNext()) {
+            TopicPartition topicPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + 
topicPartition.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicPartition +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            // TODO: if this partition is configured for unclean leader 
election,
+            // check the replica set rather than the ISR.
+            if (Replicas.contains(partition.isr, brokerId)) {
+                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                    setPartitionId(topicPartition.partitionId()).
+                    setTopicId(topic.id).
+                    setIsr(Replicas.toList(partition.isr)).
+                    setLeader(brokerId).
+                    setLeaderEpoch(partition.leaderEpoch + 1).
+                    setPartitionEpoch(partition.partitionEpoch + 1), (short) 
0));
+            }
+        }
+    }
+
+    void removeLeaderships(int brokerId, List<ApiMessageAndVersion> records) {
+        Iterator<TopicPartition> iterator = brokersToIsrs.iterator(brokerId, 
true);
+        while (iterator.hasNext()) {
+            TopicPartition topicPartition = iterator.next();
+            TopicControlInfo topic = topics.get(topicPartition.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + 
topicPartition.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicPartition.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicPartition +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] isrWithoutCurLeader = Replicas.copyWithout(partition.isr, 
brokerId);
+            int newLeader = chooseNewLeader(partition, isrWithoutCurLeader, 
false);
+            records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+                setPartitionId(topicPartition.partitionId()).
+                setTopicId(topic.id).
+                setIsr(Replicas.toList(partition.isr)).
+                setLeader(newLeader).
+                setLeaderEpoch(partition.leaderEpoch + 1).
+                setPartitionEpoch(partition.partitionEpoch + 1), (short) 0));
+        }
+    }
+
+    ControllerResult<ElectLeadersResponseData> 
electLeaders(ElectLeadersRequestData request) {
+        boolean unclean = electionIsUnclean(request.electionType());

Review comment:
       I think we need to handle preferred leader election in a special way. 
For example, if the assigned replicas are 1,2,3, isr is 2,3 and the current 
leader is 3, when doing preferred leader election, we want to keep the leader 
as 3 instead of changing it to 2.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+
+public final class MockControllerMetrics implements ControllerMetrics {

Review comment:
       This class seems never used?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+
+public class ConfigurationControlManager {
+    private final Logger log;
+    private final SnapshotRegistry snapshotRegistry;
+    private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<ConfigResource, TimelineHashMap<String, 
String>> configData;
+
+    ConfigurationControlManager(LogContext logContext,
+                                SnapshotRegistry snapshotRegistry,
+                                Map<ConfigResource.Type, ConfigDef> 
configDefs) {
+        this.log = logContext.logger(ConfigurationControlManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.configDefs = configDefs;
+        this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Determine the result of applying a batch of incremental configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * Note that there can only be one result per ConfigResource.  So if you 
try to modify
+     * several keys and one modification fails, the whole ConfigKey fails and 
nothing gets
+     * changed.
+     *
+     * @param configChanges     Maps each resource to a map from config keys to
+     *                          operation data.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> 
configChanges) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> 
resourceEntry :
+                configChanges.entrySet()) {
+            incrementalAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void incrementalAlterConfigResource(ConfigResource configResource,
+                                                Map<String, Entry<OpType, 
String>> keysToOps,
+                                                List<ApiMessageAndVersion> 
outputRecords,
+                                                Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        for (Entry<String, Entry<OpType, String>> keysToOpsEntry : 
keysToOps.entrySet()) {
+            String key = keysToOpsEntry.getKey();
+            String currentValue = null;
+            TimelineHashMap<String, String> currentConfigs = 
configData.get(configResource);
+            if (currentConfigs != null) {
+                currentValue = currentConfigs.get(key);
+            }
+            String newValue = currentValue;
+            Entry<OpType, String> opTypeAndNewValue = 
keysToOpsEntry.getValue();
+            OpType opType = opTypeAndNewValue.getKey();
+            String opValue = opTypeAndNewValue.getValue();
+            switch (opType) {
+                case SET:
+                    newValue = opValue;
+                    break;
+                case DELETE:
+                    if (opValue != null) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_REQUEST, "A DELETE op was given 
with a " +
+                            "non-null value."));
+                        return;
+                    }
+                    newValue = null;
+                    break;
+                case APPEND:
+                case SUBTRACT:
+                    if (!isSplittable(configResource.type(), key)) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_CONFIG, "Can't " + opType + " to " +
+                            "key " + key + " because its type is not LIST."));
+                        return;
+                    }
+                    List<String> newValueParts = getParts(newValue, key, 
configResource);
+                    if (opType == APPEND) {
+                        if (!newValueParts.contains(opValue)) {
+                            newValueParts.add(opValue);
+                        }
+                        newValue = String.join(",", newValueParts);
+                    } else if (newValueParts.remove(opValue)) {
+                        newValue = String.join(",", newValueParts);
+                    }
+                    break;
+            }
+            if (!Objects.equals(currentValue, newValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    /**
+     * Determine the result of applying a batch of legacy configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * @param newConfigs        The new configurations to install for each 
resource.
+     *                          All existing configurations will be 
overwritten.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+        Map<ConfigResource, Map<String, String>> newConfigs) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, String>> resourceEntry :
+            newConfigs.entrySet()) {
+            legacyAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void legacyAlterConfigResource(ConfigResource configResource,
+                                           Map<String, String> newConfigs,
+                                           List<ApiMessageAndVersion> 
outputRecords,
+                                           Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        Map<String, String> currentConfigs = configData.get(configResource);
+        if (currentConfigs == null) {
+            currentConfigs = Collections.emptyMap();
+        }
+        for (Entry<String, String> entry : newConfigs.entrySet()) {
+            String key = entry.getKey();
+            String newValue = entry.getValue();
+            String currentValue = currentConfigs.get(key);
+            if (!Objects.equals(newValue, currentValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        for (String key : currentConfigs.keySet()) {
+            if (!newConfigs.containsKey(key)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(null), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    private List<String> getParts(String value, String key, ConfigResource 
configResource) {
+        if (value == null) {
+            value = getConfigValueDefault(configResource.type(), key);
+        }
+        List<String> parts = new ArrayList<>();
+        if (value == null) {
+            return parts;
+        }
+        String[] splitValues = value.split(",");
+        for (String splitValue : splitValues) {
+            if (!splitValue.isEmpty()) {
+                parts.add(splitValue);
+            }
+        }
+        return parts;
+    }
+
+    static ApiError checkConfigResource(ConfigResource configResource) {
+        switch (configResource.type()) {
+            case BROKER_LOGGER:
+                // We do not handle resources of type BROKER_LOGGER in
+                // ConfigurationControlManager, since they are not persisted 
to the
+                // metadata log.
+                //
+                // When using incrementalAlterConfigs, we handle changes to 
BROKER_LOGGER
+                // in ControllerApis.scala.  When using the legacy 
alterConfigs,
+                // BROKER_LOGGER is not supported at all.
+                return new ApiError(Errors.INVALID_REQUEST, "Unsupported " +
+                    "configuration resource type BROKER_LOGGER ");
+            case BROKER:
+                if (!configResource.name().isEmpty()) {
+                    try {
+                        int brokerId = Integer.parseInt(configResource.name());
+                        if (brokerId < 0) {
+                            return new ApiError(Errors.INVALID_REQUEST, 
"Illegal " +
+                                "negative broker ID in BROKER resource.");
+                        }
+                    } catch (NumberFormatException e) {
+                        return new ApiError(Errors.INVALID_REQUEST, "Illegal " 
+
+                            "non-integral BROKER resource type name.");
+                    }
+                }
+                return ApiError.NONE;
+            case TOPIC:
+                if (!configResource.name().isEmpty()) {

Review comment:
       An empty topic name currently results in an INVALID_REQUEST error.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+
+public class ConfigurationControlManager {
+    private final Logger log;
+    private final SnapshotRegistry snapshotRegistry;
+    private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<ConfigResource, TimelineHashMap<String, 
String>> configData;
+
+    ConfigurationControlManager(LogContext logContext,
+                                SnapshotRegistry snapshotRegistry,
+                                Map<ConfigResource.Type, ConfigDef> 
configDefs) {
+        this.log = logContext.logger(ConfigurationControlManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.configDefs = configDefs;
+        this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Determine the result of applying a batch of incremental configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * Note that there can only be one result per ConfigResource.  So if you 
try to modify
+     * several keys and one modification fails, the whole ConfigKey fails and 
nothing gets
+     * changed.
+     *
+     * @param configChanges     Maps each resource to a map from config keys to
+     *                          operation data.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> 
configChanges) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> 
resourceEntry :
+                configChanges.entrySet()) {
+            incrementalAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void incrementalAlterConfigResource(ConfigResource configResource,
+                                                Map<String, Entry<OpType, 
String>> keysToOps,
+                                                List<ApiMessageAndVersion> 
outputRecords,
+                                                Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        for (Entry<String, Entry<OpType, String>> keysToOpsEntry : 
keysToOps.entrySet()) {
+            String key = keysToOpsEntry.getKey();
+            String currentValue = null;
+            TimelineHashMap<String, String> currentConfigs = 
configData.get(configResource);
+            if (currentConfigs != null) {
+                currentValue = currentConfigs.get(key);
+            }
+            String newValue = currentValue;
+            Entry<OpType, String> opTypeAndNewValue = 
keysToOpsEntry.getValue();
+            OpType opType = opTypeAndNewValue.getKey();
+            String opValue = opTypeAndNewValue.getValue();
+            switch (opType) {
+                case SET:
+                    newValue = opValue;
+                    break;
+                case DELETE:
+                    if (opValue != null) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_REQUEST, "A DELETE op was given 
with a " +
+                            "non-null value."));
+                        return;
+                    }
+                    newValue = null;
+                    break;
+                case APPEND:
+                case SUBTRACT:
+                    if (!isSplittable(configResource.type(), key)) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_CONFIG, "Can't " + opType + " to " +
+                            "key " + key + " because its type is not LIST."));
+                        return;
+                    }
+                    List<String> newValueParts = getParts(newValue, key, 
configResource);
+                    if (opType == APPEND) {
+                        if (!newValueParts.contains(opValue)) {
+                            newValueParts.add(opValue);
+                        }
+                        newValue = String.join(",", newValueParts);
+                    } else if (newValueParts.remove(opValue)) {
+                        newValue = String.join(",", newValueParts);
+                    }
+                    break;
+            }
+            if (!Objects.equals(currentValue, newValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    /**
+     * Determine the result of applying a batch of legacy configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * @param newConfigs        The new configurations to install for each 
resource.
+     *                          All existing configurations will be 
overwritten.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+        Map<ConfigResource, Map<String, String>> newConfigs) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, String>> resourceEntry :
+            newConfigs.entrySet()) {
+            legacyAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void legacyAlterConfigResource(ConfigResource configResource,
+                                           Map<String, String> newConfigs,
+                                           List<ApiMessageAndVersion> 
outputRecords,
+                                           Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        Map<String, String> currentConfigs = configData.get(configResource);
+        if (currentConfigs == null) {
+            currentConfigs = Collections.emptyMap();
+        }
+        for (Entry<String, String> entry : newConfigs.entrySet()) {
+            String key = entry.getKey();
+            String newValue = entry.getValue();
+            String currentValue = currentConfigs.get(key);
+            if (!Objects.equals(newValue, currentValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        for (String key : currentConfigs.keySet()) {
+            if (!newConfigs.containsKey(key)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(null), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    private List<String> getParts(String value, String key, ConfigResource 
configResource) {
+        if (value == null) {
+            value = getConfigValueDefault(configResource.type(), key);
+        }
+        List<String> parts = new ArrayList<>();
+        if (value == null) {
+            return parts;
+        }
+        String[] splitValues = value.split(",");
+        for (String splitValue : splitValues) {
+            if (!splitValue.isEmpty()) {
+                parts.add(splitValue);
+            }
+        }
+        return parts;
+    }
+
+    static ApiError checkConfigResource(ConfigResource configResource) {
+        switch (configResource.type()) {
+            case BROKER_LOGGER:
+                // We do not handle resources of type BROKER_LOGGER in
+                // ConfigurationControlManager, since they are not persisted 
to the
+                // metadata log.
+                //
+                // When using incrementalAlterConfigs, we handle changes to 
BROKER_LOGGER
+                // in ControllerApis.scala.  When using the legacy 
alterConfigs,
+                // BROKER_LOGGER is not supported at all.
+                return new ApiError(Errors.INVALID_REQUEST, "Unsupported " +
+                    "configuration resource type BROKER_LOGGER ");
+            case BROKER:
+                if (!configResource.name().isEmpty()) {

Review comment:
       An empty broker currently results in an INVALID_REQUEST error.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class QuorumControllerTestEnv implements AutoCloseable {
+    private static final Logger log =
+        LoggerFactory.getLogger(QuorumControllerTestEnv.class);
+
+    private final List<QuorumController> controllers;
+
+    public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
+                                   Consumer<QuorumController.Builder> 
builderConsumer)
+                                   throws Exception {
+        int numControllers = logEnv.logManagers().size();
+        this.controllers = new ArrayList<>(numControllers);
+        try {
+            for (int i = 0; i < numControllers; i++) {
+                QuorumController.Builder builder = new 
QuorumController.Builder(i);
+                builder.setLogManager(logEnv.logManagers().get(i));
+                builderConsumer.accept(builder);
+                this.controllers.add(builder.build());
+            }
+        } catch (Exception e) {
+            close();
+            throw e;
+        }
+    }
+
+    QuorumController activeController() throws InterruptedException {
+        AtomicReference<QuorumController> value = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+            QuorumController activeController = null;
+            for (QuorumController controller : controllers) {
+                long curEpoch = controller.curClaimEpoch();
+                if (curEpoch != -1) {

Review comment:
       To make this more intuitive, perhaps we could add a method isActive in 
QuorumController?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.BrokersToIsrs.TopicPartition;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS;
+import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerTest {
+    private static final Logger log =
+        LoggerFactory.getLogger(QuorumControllerTest.class);
+
+    /**
+     * Test creating a new QuorumController and closing it.
+     */
+    @Test
+    public void testCreateAndClose() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, __ -> { })) {
+            }
+        }
+    }
+
+    /**
+     * Test setting some configuration values and reading them back.
+     */
+    @Test
+    public void testConfigurationOperations() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS))) {
+                testConfigurationOperations(controlEnv.activeController());
+            }
+        }
+    }
+
+    private void testConfigurationOperations(QuorumController controller) 
throws Throwable {
+        assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
+            controller.incrementalAlterConfigs(Collections.singletonMap(
+                BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), 
true).get());
+        assertEquals(Collections.singletonMap(BROKER0,
+            new ResultOrError<>(Collections.emptyMap())),
+            controller.describeConfigs(Collections.singletonMap(
+                BROKER0, Collections.emptyList())).get());
+        assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
+            controller.incrementalAlterConfigs(Collections.singletonMap(
+                BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), 
false).get());
+        assertEquals(Collections.singletonMap(BROKER0, new 
ResultOrError<>(Collections.
+                singletonMap("baz", "123"))),
+            controller.describeConfigs(Collections.singletonMap(
+                BROKER0, Collections.emptyList())).get());
+    }
+
+    /**
+     * Test that an incrementalAlterConfigs operation doesn't complete until 
the records
+     * can be written to the metadata log.
+     */
+    @Test
+    public void testDelayedConfigurationOperations() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS))) {
+                testDelayedConfigurationOperations(logEnv, 
controlEnv.activeController());
+            }
+        }
+    }
+
+    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv 
logEnv,
+                                                    QuorumController 
controller)
+                                                    throws Throwable {
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(0L));
+        CompletableFuture<Map<ConfigResource, ApiError>> future1 =
+            controller.incrementalAlterConfigs(Collections.singletonMap(
+                BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), 
false);
+        assertFalse(future1.isDone());
+        assertEquals(Collections.singletonMap(BROKER0,
+            new ResultOrError<>(Collections.emptyMap())),
+            controller.describeConfigs(Collections.singletonMap(
+                BROKER0, Collections.emptyList())).get());
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
+        assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
+    }
+
+    @Test
+    public void testUnregisterBroker() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+            try (QuorumControllerTestEnv controlEnv =
+                     new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS))) {
+                ListenerCollection listeners = new ListenerCollection();
+                listeners.add(new Listener().setName("PLAINTEXT").
+                    setHost("localhost").setPort(9092));
+                QuorumController active = controlEnv.activeController();
+                CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(0).
+                        
setClusterId(Uuid.fromString("06B-K3N1TBCNYFgruEVP0Q")).
+                        
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                        setListeners(listeners));
+                assertEquals(0L, reply.get().epoch());
+                CreateTopicsRequestData createTopicsRequestData =
+                    new CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                                setReplicationFactor((short) 1)).iterator()));
+                assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), 
active.createTopics(
+                    
createTopicsRequestData).get().topics().find("foo").errorCode());

Review comment:
       Hmm, why is a replication factor of 1 invalid?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##########
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.UsableBroker;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.apache.kafka.controller.BrokerControlState.FENCED;
+import static 
org.apache.kafka.controller.BrokerControlState.CONTROLLED_SHUTDOWN;
+import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
+import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
+
+
+/**
+ * The BrokerHeartbeatManager manages all the soft state associated with 
broker heartbeats.
+ * Soft state is state which does not appear in the metadata log.  This state 
includes
+ * things like the last time each broker sent us a heartbeat, and whether the 
broker is
+ * trying to perform a controlled shutdown.
+ *
+ * Only the active controller has a BrokerHeartbeatManager, since only the 
active
+ * controller handles broker heartbeats.  Standby controllers will create a 
heartbeat
+ * manager as part of the process of activating.  This design minimizes the 
size of the
+ * metadata partition by excluding heartbeats from it.  However, it does mean 
that after
+ * a controller failover, we may take some extra time to fence brokers, since 
the new
+ * active controller does not know when the last heartbeats were received from 
each.
+ */
+public class BrokerHeartbeatManager {
+    static class BrokerHeartbeatState {
+        /**
+         * The broker ID.
+         */
+        private final int id;
+
+        /**
+         * The last time we received a heartbeat from this broker, in 
monotonic nanoseconds.
+         * When this field is updated, we also may have to update the broker's 
position in
+         * the unfenced list.
+         */
+        long lastContactNs;
+
+        /**
+         * The last metadata offset which this broker reported.  When this 
field is updated,
+         * we may also have to update the broker's position in the active set.
+         */
+        long metadataOffset;
+
+        /**
+         * The offset at which the broker should complete its controlled 
shutdown, or -1
+         * if the broker is not performing a controlled shutdown.  When this 
field is
+         * updated, we also have to update the broker's position in the 
shuttingDown set.
+         */
+        private long controlledShutDownOffset;
+
+        /**
+         * The previous entry in the unfenced list, or null if the broker is 
not in that list.
+         */
+        private BrokerHeartbeatState prev;
+
+        /**
+         * The next entry in the unfenced list, or null if the broker is not 
in that list.
+         */
+        private BrokerHeartbeatState next;
+
+        BrokerHeartbeatState(int id) {
+            this.id = id;
+            this.lastContactNs = 0;
+            this.prev = null;
+            this.next = null;
+            this.metadataOffset = -1;
+            this.controlledShutDownOffset = -1;
+        }
+
+        /**
+         * Returns the broker ID.
+         */
+        int id() {
+            return id;
+        }
+
+        /**
+         * Returns true only if the broker is fenced.
+         */
+        boolean fenced() {
+            return prev == null;
+        }
+
+        /**
+         * Returns true only if the broker is in controlled shutdown state.
+         */
+        boolean shuttingDown() {
+            return controlledShutDownOffset >= 0;
+        }
+    }
+
+    static class MetadataOffsetComparator implements 
Comparator<BrokerHeartbeatState> {
+        static final MetadataOffsetComparator INSTANCE = new 
MetadataOffsetComparator();
+
+        @Override
+        public int compare(BrokerHeartbeatState a, BrokerHeartbeatState b) {
+            if (a.metadataOffset < b.metadataOffset) {
+                return -1;
+            } else if (a.metadataOffset > b.metadataOffset) {
+                return 1;
+            } else if (a.id < b.id) {
+                return -1;
+            } else if (a.id > b.id) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    static class BrokerHeartbeatStateList {
+        /**
+         * The head of the list of unfenced brokers.  The list is sorted in 
ascending order
+         * of last contact time.
+         */
+        private final BrokerHeartbeatState head;
+
+        BrokerHeartbeatStateList() {
+            this.head = new BrokerHeartbeatState(-1);
+            head.prev = head;
+            head.next = head;
+        }
+
+        /**
+         * Return the head of the list, or null if the list is empty.
+         */
+        BrokerHeartbeatState first() {
+            BrokerHeartbeatState result = head.next;
+            return result == head ? null : result;
+        }
+
+        /**
+         * Add the broker to the list. We start looking for a place to put it 
at the end
+         * of the list.
+         */
+        void add(BrokerHeartbeatState broker) {
+            BrokerHeartbeatState cur = head.prev;
+            while (true) {
+                if (cur == head || cur.lastContactNs <= broker.lastContactNs) {
+                    broker.next = cur.next;
+                    cur.next.prev = broker;
+                    broker.prev = cur;
+                    cur.next = broker;
+                    break;
+                }
+                cur = cur.prev;
+            }
+        }
+
+        /**
+         * Remove a broker from the list.
+         */
+        void remove(BrokerHeartbeatState broker) {
+            if (broker.next == null) {
+                throw new RuntimeException(broker + " is not in the  list.");
+            }
+            broker.prev.next = broker.next;
+            broker.next.prev = broker.prev;
+            broker.prev = null;
+            broker.next = null;
+        }
+
+        BrokerHeartbeatStateIterator iterator() {
+            return new BrokerHeartbeatStateIterator(head);
+        }
+    }
+
+    static class BrokerHeartbeatStateIterator implements 
Iterator<BrokerHeartbeatState> {
+        private final BrokerHeartbeatState head;
+        private BrokerHeartbeatState cur;
+
+        BrokerHeartbeatStateIterator(BrokerHeartbeatState head) {
+            this.head = head;
+            this.cur = head;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cur.next != head;
+        }
+
+        @Override
+        public BrokerHeartbeatState next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            BrokerHeartbeatState result = cur.next;
+            cur = cur.next;
+            return result;
+        }
+    }
+
+    private final Logger log;
+
+    /**
+     * The Kafka clock object to use.
+     */
+    private final Time time;
+
+    /**
+     * The broker session timeout in nanoseconds.
+     */
+    private final long sessionTimeoutNs;
+
+    /**
+     * Maps broker IDs to heartbeat states.
+     */
+    private final HashMap<Integer, BrokerHeartbeatState> brokers;
+
+    /**
+     * The list of unfenced brokers, sorted by last contact time.
+     */
+    private final BrokerHeartbeatStateList unfenced;
+
+    /**
+     * The set of active brokers.  A broker is active if it is unfenced, and 
not shutting
+     * down.
+     */
+    private final TreeSet<BrokerHeartbeatState> active;
+
+    BrokerHeartbeatManager(LogContext logContext,
+                           Time time,
+                           long sessionTimeoutNs) {
+        this.log = logContext.logger(BrokerHeartbeatManager.class);
+        this.time = time;
+        this.sessionTimeoutNs = sessionTimeoutNs;
+        this.brokers = new HashMap<>();
+        this.unfenced = new BrokerHeartbeatStateList();
+        this.active = new TreeSet<>(MetadataOffsetComparator.INSTANCE);
+    }
+
+    // VisibleForTesting
+    Time time() {
+        return time;
+    }
+
+    // VisibleForTesting
+    BrokerHeartbeatStateList unfenced() {
+        return unfenced;
+    }
+
+    /**
+     * Mark a broker as fenced.
+     *
+     * @param brokerId      The ID of the broker to mark as fenced.
+     */
+    void fence(int brokerId) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker != null) {
+            untrack(broker);
+        }
+    }
+
+    /**
+     * Remove a broker.
+     *
+     * @param brokerId      The ID of the broker to remove.
+     */
+    void remove(int brokerId) {
+        BrokerHeartbeatState broker = brokers.remove(brokerId);
+        if (broker != null) {
+            untrack(broker);
+        }
+    }
+
+    /**
+     * Stop tracking the broker in the unfenced list and active set, if it was 
tracked
+     * in either of these.
+     *
+     * @param broker        The broker state to stop tracking.
+     */
+    private void untrack(BrokerHeartbeatState broker) {
+        if (!broker.fenced()) {
+            unfenced.remove(broker);
+            if (!broker.shuttingDown()) {
+                active.remove(broker);
+            }
+        }
+    }
+
+    /**
+     * Check if the given broker has a valid session.
+     *
+     * @param brokerId      The broker ID to check.
+     *
+     * @return              True if the given broker has a valid session.
+     */
+    boolean hasValidSession(int brokerId) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null) return false;
+        return hasValidSession(broker);
+    }
+
+    /**
+     * Check if the given broker has a valid session.
+     *
+     * @param broker        The broker to check.
+     *
+     * @return              True if the given broker has a valid session.
+     */
+    private boolean hasValidSession(BrokerHeartbeatState broker) {
+        if (broker.fenced()) {
+            return false;
+        } else {
+            return broker.lastContactNs + sessionTimeoutNs >= 
time.nanoseconds();
+        }
+    }
+
+    /**
+     * Update broker state, including lastContactNs.
+     *
+     * @param brokerId          The broker ID.
+     * @param fenced            True only if the broker is currently fenced.
+     * @param metadataOffset    The latest metadata offset of the broker.
+     */
+    void touch(int brokerId, boolean fenced, long metadataOffset) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null) {
+            broker = new BrokerHeartbeatState(brokerId);
+            brokers.put(brokerId, broker);
+        } else {
+            // Remove the broker from the unfenced list and/or the active set. 
Its
+            // position in either of those data structures depends on values 
we are
+            // changing here. We will re-add it if necessary at the end of 
this function.
+            untrack(broker);
+        }
+        broker.lastContactNs = time.nanoseconds();
+        broker.metadataOffset = metadataOffset;
+        if (fenced) {
+            // If a broker is fenced, it leaves controlled shutdown.  On its 
next heartbeat,
+            // it will shut down immediately.
+            broker.controlledShutDownOffset = -1;
+        } else {
+            unfenced.add(broker);
+            if (!broker.shuttingDown()) {
+                active.add(broker);
+            }
+        }
+    }
+
+    long lowestActiveOffset() {
+        Iterator<BrokerHeartbeatState> iterator = active.iterator();
+        if (!iterator.hasNext()) {
+            return Long.MAX_VALUE;
+        }
+        BrokerHeartbeatState first = iterator.next();
+        return first.metadataOffset;
+    }
+
+    /**
+     * Mark a broker as being in the controlled shutdown state.
+     *
+     * @param brokerId                  The broker id.
+     * @param controlledShutDownOffset  The offset at which controlled 
shutdown will be complete.
+     */
+    void updateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null) {
+            throw new RuntimeException("Unable to locate broker " + brokerId);
+        }
+        if (broker.fenced()) {
+            throw new RuntimeException("Fenced brokers cannot enter controlled 
shutdown.");
+        }
+        active.remove(broker);
+        broker.controlledShutDownOffset = controlledShutDownOffset;
+        log.debug("Updated the controlled shutdown offset for broker {} to 
{}.",
+            brokerId, controlledShutDownOffset);
+    }
+
+    /**
+     * Return the time in monotonic nanoseconds at which we should check if a 
broker
+     * session needs to be expired.
+     */
+    long nextCheckTimeNs() {
+        BrokerHeartbeatState broker = unfenced.first();
+        if (broker == null) {
+            return Long.MAX_VALUE;
+        } else {
+            return broker.lastContactNs + sessionTimeoutNs;
+        }
+    }
+
+    /**
+     * Find the stale brokers which haven't heartbeated in a long time, and 
which need to
+     * be fenced.
+     *
+     * @return      A list of node IDs.
+     */
+    List<Integer> findStaleBrokers() {
+        List<Integer> nodes = new ArrayList<>();
+        BrokerHeartbeatStateIterator iterator = unfenced.iterator();
+        while (iterator.hasNext()) {
+            BrokerHeartbeatState broker = iterator.next();
+            if (hasValidSession(broker)) {
+                break;
+            }
+            nodes.add(broker.id);
+        }
+        return nodes;
+    }
+
+    /**
+     * Place replicas on unfenced brokers.
+     *
+     * @param numPartitions     The number of partitions to place.
+     * @param numReplicas       The number of replicas for each partition.
+     * @param idToRack          A function mapping broker id to broker rack.
+     * @param policy            The replica placement policy to use.
+     *
+     * @return                  A list of replica lists.
+     *
+     * @throws InvalidReplicationFactorException    If too many replicas were 
requested.
+     */
+    List<List<Integer>> placeReplicas(int numPartitions, short numReplicas,
+                                      Function<Integer, Optional<String>> 
idToRack,
+                                      ReplicaPlacementPolicy policy) {
+        // TODO: support using fenced brokers here if necessary to get to the 
desired
+        // number of replicas. We probably need to add a fenced boolean in 
UsableBroker.
+        Iterator<UsableBroker> iterator = new UsableBrokerIterator(
+            unfenced.iterator(), idToRack);
+        return policy.createPlacement(numPartitions, numReplicas, iterator);
+    }
+
+    static class UsableBrokerIterator implements Iterator<UsableBroker> {
+        private final Iterator<BrokerHeartbeatState> iterator;
+        private final Function<Integer, Optional<String>> idToRack;
+        private UsableBroker next;
+
+        UsableBrokerIterator(Iterator<BrokerHeartbeatState> iterator,
+                             Function<Integer, Optional<String>> idToRack) {
+            this.iterator = iterator;
+            this.idToRack = idToRack;
+            this.next = null;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null) {
+                return true;
+            }
+            BrokerHeartbeatState result;
+            do {
+                if (!iterator.hasNext()) {
+                    return false;
+                }
+                result = iterator.next();
+            } while (result.shuttingDown());
+            Optional<String> rack = idToRack.apply(result.id());
+            next = new UsableBroker(result.id(), rack);
+            return true;
+        }
+
+        @Override
+        public UsableBroker next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            UsableBroker result = next;
+            next = null;
+            return result;
+        }
+    }
+
+    BrokerControlState currentBrokerState(BrokerHeartbeatState broker) {
+        if (broker.shuttingDown()) {
+            return CONTROLLED_SHUTDOWN;
+        } else if (broker.fenced()) {
+            return FENCED;
+        } else {
+            return UNFENCED;
+        }
+    }
+
+    /**
+     * Calculate the next broker state for a broker that just sent a heartbeat 
request.
+     *
+     * @param brokerId              The broker id.
+     * @param request               The incoming heartbeat request.
+     * @param lastCommittedOffset   The last committed offset of the quorum 
controller.
+     * @param hasLeaderships        A callback which evaluates to true if the 
broker leads
+     *                              at least one partition.
+     *
+     * @return                      The current and next broker states.
+     */
+    BrokerControlStates calculateNextBrokerState(int brokerId,
+                                                 BrokerHeartbeatRequestData 
request,
+                                                 long lastCommittedOffset,
+                                                 Supplier<Boolean> 
hasLeaderships) {
+        BrokerHeartbeatState broker = brokers.getOrDefault(brokerId,
+            new BrokerHeartbeatState(brokerId));
+        BrokerControlState currentState = currentBrokerState(broker);
+        switch (currentState) {
+            case FENCED:
+                if (request.wantShutDown()) {
+                    log.info("Fenced broker {} has requested and been granted 
an immediate " +
+                        "shutdown.", brokerId);
+                    return new BrokerControlStates(currentState, SHUTDOWN_NOW);
+                } else if (!request.wantFence()) {
+                    if (request.currentMetadataOffset() >= 
lastCommittedOffset) {
+                        log.info("The request from broker {} to unfence has 
been granted " +
+                                "because it has caught up with the last 
committed metadata " +
+                                "offset {}.", brokerId, lastCommittedOffset);
+                        return new BrokerControlStates(currentState, UNFENCED);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("The request from broker {} to unfence 
cannot yet " +
+                                "be granted because it has not caught up with 
the last " +
+                                "committed metadata offset {}. It is still at 
offset {}.",
+                                brokerId, lastCommittedOffset, 
request.currentMetadataOffset());
+                        }
+                        return new BrokerControlStates(currentState, FENCED);
+                    }
+                }
+                return new BrokerControlStates(currentState, FENCED);
+
+            case UNFENCED:
+                if (request.wantFence()) {
+                    if (request.wantShutDown()) {

Review comment:
       Do we allow a heartbeat request to set both the fence and wantShutDown 
flag?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to