cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r577217605
########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.IsrChangeRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.QuotaRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.metadata.UnfenceBrokerRecord; +import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureManager; +import org.apache.kafka.metadata.VersionRange; +import org.apache.kafka.metalog.MetaLogLeader; +import org.apache.kafka.metalog.MetaLogListener; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; +import org.apache.kafka.queue.EventQueue; +import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +public final class QuorumController implements Controller { + /** + * A builder class which creates the QuorumController. + */ + static public class Builder { + private final int nodeId; + private Time time = Time.SYSTEM; + private String threadNamePrefix = null; + private LogContext logContext = null; + private Map<ConfigResource.Type, ConfigDef> configDefs = Collections.emptyMap(); + private MetaLogManager logManager = null; + private Map<String, VersionRange> supportedFeatures = Collections.emptyMap(); + private short defaultReplicationFactor = 3; + private int defaultNumPartitions = 1; + private ReplicaPlacementPolicy replicaPlacementPolicy = + new SimpleReplicaPlacementPolicy(new Random()); + private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, TimeUnit.SECONDS); + + public Builder(int nodeId) { + this.nodeId = nodeId; + } + + public Builder setTime(Time time) { + this.time = time; + return this; + } + + public Builder setThreadNamePrefix(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + return this; + } + + public Builder setLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + public Builder setConfigDefs(Map<ConfigResource.Type, ConfigDef> configDefs) { + this.configDefs = configDefs; + return this; + } + + public Builder setLogManager(MetaLogManager logManager) { + this.logManager = logManager; + return this; + } + + public Builder setSupportedFeatures(Map<String, VersionRange> supportedFeatures) { + this.supportedFeatures = supportedFeatures; + return this; + } + + public Builder setDefaultReplicationFactor(short defaultReplicationFactor) { + this.defaultReplicationFactor = defaultReplicationFactor; + return this; + } + + public Builder setDefaultNumPartitions(int defaultNumPartitions) { + this.defaultNumPartitions = defaultNumPartitions; + return this; + } + + public Builder setReplicaPlacementPolicy(ReplicaPlacementPolicy replicaPlacementPolicy) { + this.replicaPlacementPolicy = replicaPlacementPolicy; + return this; + } + + public Builder setSessionTimeoutNs(long sessionTimeoutNs) { + this.sessionTimeoutNs = sessionTimeoutNs; + return this; + } + + public QuorumController build() throws Exception { + if (logManager == null) { + throw new RuntimeException("You must set a metadata log manager."); + } + if (threadNamePrefix == null) { + threadNamePrefix = String.format("Node%d_", nodeId); + } + if (logContext == null) { + logContext = new LogContext(String.format("[Controller %d] ", nodeId)); + } + KafkaEventQueue queue = null; + try { + queue = new KafkaEventQueue(time, logContext, threadNamePrefix); + return new QuorumController(logContext, nodeId, queue, time, configDefs, + logManager, supportedFeatures, defaultReplicationFactor, + defaultNumPartitions, replicaPlacementPolicy, sessionTimeoutNs); + } catch (Exception e) { + Utils.closeQuietly(queue, "event queue"); + throw e; + } + } + } + + private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX = + "The active controller appears to be node "; + + private NotControllerException newNotControllerException() { + int latestController = logManager.leader().nodeId(); + if (latestController < 0) { + return new NotControllerException("No controller appears to be active."); + } else { + return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX + + latestController); + } + } + + public static int exceptionToApparentController(NotControllerException e) { + if (e.getMessage().startsWith(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX)) { + return Integer.parseInt(e.getMessage().substring( + ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX.length())); + } else { + return -1; + } + } + + private void handleEventEnd(String name, long startProcessingTimeNs) { + long endProcessingTime = time.nanoseconds(); + long deltaNs = endProcessingTime - startProcessingTimeNs; + log.debug("Processed {} in {} us", name, + TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS)); + } + + private Throwable handleEventException(String name, + Optional<Long> startProcessingTimeNs, + Throwable exception) { + if (!startProcessingTimeNs.isPresent()) { + log.debug("unable to start processing {} because of {}.", name, + exception.getClass().getSimpleName()); + if (exception instanceof ApiException) { + return exception; + } else { + return new UnknownServerException(exception); + } + } + long endProcessingTime = time.nanoseconds(); + long deltaNs = endProcessingTime - startProcessingTimeNs.get(); + long deltaUs = TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS); + if (exception instanceof ApiException) { + log.debug("{}: failed with {} in {} us", name, + exception.getClass().getSimpleName(), deltaUs); + return exception; + } + log.warn("{}: failed with unknown server exception {} at epoch {} in {} us. " + + "Reverting to last committed offset {}.", + this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs, + lastCommittedOffset, exception); + renounce(); + return new UnknownServerException(exception); + } + + /** + * A controller event for handling internal state changes, such as Raft inputs. + */ + class ControlEvent implements EventQueue.Event { + private final String name; + private final Runnable handler; + private Optional<Long> startProcessingTimeNs = Optional.empty(); + + ControlEvent(String name, Runnable handler) { + this.name = name; + this.handler = handler; + } + + @Override + public void run() throws Exception { + startProcessingTimeNs = Optional.of(time.nanoseconds()); + log.debug("Executing {}.", this); + handler.run(); + handleEventEnd(this.toString(), startProcessingTimeNs.get()); + } + + @Override + public void handleException(Throwable exception) { + handleEventException(name, startProcessingTimeNs, exception); + } + + @Override + public String toString() { + return name; + } + } + + private void appendControlEvent(String name, Runnable handler) { + ControlEvent event = new ControlEvent(name, handler); + queue.append(event); + } + + /** + * A controller event that reads the committed internal state in order to expose it + * to an API. + */ + class ControllerReadEvent<T> implements EventQueue.Event { + private final String name; + private final CompletableFuture<T> future; + private final Supplier<T> handler; + private Optional<Long> startProcessingTimeNs = Optional.empty(); + + ControllerReadEvent(String name, Supplier<T> handler) { + this.name = name; + this.future = new CompletableFuture<T>(); + this.handler = handler; + } + + CompletableFuture<T> future() { + return future; + } + + @Override + public void run() throws Exception { + startProcessingTimeNs = Optional.of(time.nanoseconds()); + T value = handler.get(); + handleEventEnd(this.toString(), startProcessingTimeNs.get()); + future.complete(value); + } + + @Override + public void handleException(Throwable exception) { + future.completeExceptionally( + handleEventException(name, startProcessingTimeNs, exception)); + } + + @Override + public String toString() { + return name + "(" + System.identityHashCode(this) + ")"; + } + } + + // VisibleForTesting + ReplicationControlManager replicationControl() { + return replicationControl; + } + + // VisibleForTesting + <T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) { + ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler); + queue.append(event); + return event.future(); + } + + interface ControllerWriteOperation<T> { + /** + * Generate the metadata records needed to implement this controller write + * operation. In general, this operation should not modify the "hard state" of + * the controller. That modification will happen later on, when we replay the + * records generated by this function. + * + * There are cases where this function modifies the "soft state" of the + * controller. Mainly, this happens when we process cluster heartbeats. + * + * This function also generates an RPC result. In general, if the RPC resulted in + * an error, the RPC result will be an error, and the generated record list will + * be empty. This would happen if we tried to create a topic with incorrect + * parameters, for example. Of course, partial errors are possible for batch + * operations. + * + * @return A tuple containing a list of records, and an RPC result. + */ + ControllerResult<T> generateRecordsAndResult() throws Exception; + + /** + * Once we've passed the records to the Raft layer, we will invoke this function + * with the end offset at which those records were placed. If there were no + * records to write, we'll just pass the last write offset. + */ + default void processBatchEndOffset(long offset) {} + } + + /** + * A controller event that modifies the controller state. + */ + class ControllerWriteEvent<T> implements EventQueue.Event, DeferredEvent { + private final String name; + private final CompletableFuture<T> future; + private final ControllerWriteOperation<T> op; + private Optional<Long> startProcessingTimeNs = Optional.empty(); + private ControllerResultAndOffset<T> resultAndOffset; + + ControllerWriteEvent(String name, ControllerWriteOperation<T> op) { + this.name = name; + this.future = new CompletableFuture<T>(); + this.op = op; + this.resultAndOffset = null; + } + + CompletableFuture<T> future() { + return future; + } + + @Override + public void run() throws Exception { + long controllerEpoch = curClaimEpoch; + if (controllerEpoch == -1) { + throw newNotControllerException(); + } + startProcessingTimeNs = Optional.of(time.nanoseconds()); + ControllerResult<T> result = op.generateRecordsAndResult(); + if (result.records().isEmpty()) { + op.processBatchEndOffset(writeOffset); + // If the operation did not return any records, then it was actually just + //a read after all, and not a read + write. However, this read was done + //from the latest in-memory state, which might contain uncommitted data. + Optional<Long> maybeOffset = purgatory.highestPendingOffset(); + if (!maybeOffset.isPresent()) { + // If the purgatory is empty, there are no pending operations and no + // uncommitted state. We can return immediately. + this.resultAndOffset = new ControllerResultAndOffset<>(-1, + new ArrayList<>(), result.response()); + log.debug("Completing read-only operation {} immediately because " + + "the purgatory is empty.", this); + complete(null); + return; + } + // If there are operations in the purgatory, we want to wait for the latest + // one to complete before returning our result to the user. + this.resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(), + result.records(), result.response()); + log.debug("Read-only operation {} will be completed when the log " + + "reaches offset {}", this, resultAndOffset.offset()); + } else { + // If the operation returned a batch of records, those records need to be + // written before we can return our result to the user. Here, we hand off + // the batch of records to the metadata log manager. They will be written + // out asynchronously. + long offset = logManager.scheduleWrite(controllerEpoch, result.records()); + op.processBatchEndOffset(offset); + writeOffset = offset; + this.resultAndOffset = new ControllerResultAndOffset<>(offset, + result.records(), result.response()); + for (ApiMessageAndVersion message : result.records()) { + replay(message.message()); + } + snapshotRegistry.createSnapshot(offset); + log.debug("Read-write operation {} will be completed when the log " + + "reaches offset {}.", this, resultAndOffset.offset()); + } + purgatory.add(resultAndOffset.offset(), this); + } + + @Override + public void handleException(Throwable exception) { + complete(exception); + } + + @Override + public void complete(Throwable exception) { + if (exception == null) { + handleEventEnd(this.toString(), startProcessingTimeNs.get()); + future.complete(resultAndOffset.response()); + } else { + future.completeExceptionally( + handleEventException(name, startProcessingTimeNs, exception)); + } + } + + @Override + public String toString() { + return name + "(" + System.identityHashCode(this) + ")"; + } + } + + private <T> CompletableFuture<T> appendWriteEvent(String name, + long timeoutMs, + ControllerWriteOperation<T> op) { + ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op); + queue.appendWithDeadline(time.nanoseconds() + + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS), event); + return event.future(); + } + + private <T> CompletableFuture<T> appendWriteEvent(String name, + ControllerWriteOperation<T> op) { + ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op); + queue.append(event); + return event.future(); + } + + class QuorumMetaLogListener implements MetaLogListener { + @Override + public void handleCommits(long offset, List<ApiMessage> messages) { + appendControlEvent("handleCommits[" + offset + "]", () -> { + if (curClaimEpoch == -1) { + if (log.isDebugEnabled()) { + if (log.isTraceEnabled()) { + log.trace("Replaying commits from the active node up to " + + "offset {}: {}.", offset, messages.stream(). + map(m -> m.toString()).collect(Collectors.joining(", "))); + } else { + log.debug("Replaying commits from the active node up to " + + "offset {}.", offset); + } + } + for (ApiMessage message : messages) { + replay(message); + } + } else { + log.debug("Completing purgatory items up to offset {}.", offset); + + // Complete any events in the purgatory that were waiting for this offset. + purgatory.completeUpTo(offset); + + // Delete all snapshots older than the offset. + // TODO: add an exception here for when we're writing out a log snapshot + snapshotRegistry.deleteSnapshotsUpTo(offset); + } + lastCommittedOffset = offset; + }); + } + + @Override + public void handleNewLeader(MetaLogLeader newLeader) { + if (newLeader.nodeId() == nodeId) { + final long newEpoch = newLeader.epoch(); + appendControlEvent("handleClaim[" + newEpoch + "]", () -> { + long curEpoch = curClaimEpoch; + if (curEpoch != -1) { + throw new RuntimeException("Tried to claim controller epoch " + + newEpoch + ", but we never renounced controller epoch " + + curEpoch); + } + log.info("Becoming active at controller epoch {}.", newEpoch); + curClaimEpoch = newEpoch; + writeOffset = lastCommittedOffset; + clusterControl.activate(); + }); + } + } + + @Override + public void handleRenounce(long oldEpoch) { + appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> { + if (curClaimEpoch == oldEpoch) { + log.info("Renouncing the leadership at oldEpoch {} due to a metadata " + + "log event. Reverting to last committed offset {}.", curClaimEpoch, + lastCommittedOffset); + renounce(); + } + }); + } + + @Override + public void beginShutdown() { + queue.beginShutdown("MetaLogManager.Listener"); + } + } + + private void renounce() { + curClaimEpoch = -1; + purgatory.failAll(newNotControllerException()); + snapshotRegistry.revertToSnapshot(lastCommittedOffset); + snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset); + writeOffset = -1; + clusterControl.deactivate(); + cancelMaybeFenceReplicas(); + } + + private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs, + ControllerWriteOperation<T> op) { + ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op); + queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event); + event.future.exceptionally(e -> { + if (e instanceof UnknownServerException && e.getCause() != null && + e.getCause() instanceof RejectedExecutionException) { + log.error("Cancelling write event {} because the event queue is closed.", name); + return null; + } + log.error("Unexpected exception while executing deferred write event {}. " + + "Rescheduling for a minute from now.", name, e); + scheduleDeferredWriteEvent(name, + deadlineNs + TimeUnit.NANOSECONDS.convert(1, TimeUnit.MINUTES), op); + return null; + }); + } + + static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas"; + + class MaybeFenceReplicas implements ControllerWriteOperation<Void> { + @Override + public ControllerResult<Void> generateRecordsAndResult() { + ControllerResult<Set<Integer>> result = + clusterControl.maybeFenceLeastRecentlyContacted(); + for (int brokerId : result.response()) { + replicationControl.removeFromIsr(brokerId, result.records()); + } + rescheduleMaybeFenceReplicas(); + return new ControllerResult<>(result.records(), null); + } + } + + private void rescheduleMaybeFenceReplicas() { + long nextCheckTimeNs = clusterControl.nextCheckTimeNs(); + if (nextCheckTimeNs == Long.MAX_VALUE) { + cancelMaybeFenceReplicas(); + } else { + scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, + new MaybeFenceReplicas()); + } + } + + private void cancelMaybeFenceReplicas() { + queue.cancelDeferred(MAYBE_FENCE_REPLICAS); + } + + @SuppressWarnings("unchecked") + private void replay(ApiMessage message) { + try { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + switch (type) { + case REGISTER_BROKER_RECORD: + clusterControl.replay((RegisterBrokerRecord) message); + break; + case UNREGISTER_BROKER_RECORD: + clusterControl.replay((UnregisterBrokerRecord) message); + break; + case FENCE_BROKER_RECORD: + clusterControl.replay((FenceBrokerRecord) message); + break; + case UNFENCE_BROKER_RECORD: + clusterControl.replay((UnfenceBrokerRecord) message); + break; + case TOPIC_RECORD: + replicationControl.replay((TopicRecord) message); + break; + case PARTITION_RECORD: + replicationControl.replay((PartitionRecord) message); + break; + case CONFIG_RECORD: + configurationControl.replay((ConfigRecord) message); + break; + case QUOTA_RECORD: + clientQuotaControlManager.replay((QuotaRecord) message); + break; + case ISR_CHANGE_RECORD: + replicationControl.replay((IsrChangeRecord) message); + break; + default: + throw new RuntimeException("Unhandled record type " + type); + } + } catch (Exception e) { + log.error("Error replaying record {}", message.toString(), e); + } + } + + private final Logger log; + + /** + * The ID of this controller node. + */ + private final int nodeId; + + /** + * The single-threaded queue that processes all of our events. + * It also processes timeouts. + */ + private final KafkaEventQueue queue; + + /** + * The Kafka clock object to use. + */ + private final Time time; + + /** + * A registry for snapshot data. This must be accessed only by the event queue thread. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The purgatory which holds deferred operations which are waiting for the metadata + * log's high water mark to advance. This must be accessed only by the event queue thread. + */ + private final ControllerPurgatory purgatory; + + /** + * An object which stores the controller's dynamic configuration. + * This must be accessed only by the event queue thread. + */ + private final ConfigurationControlManager configurationControl; + + /** + * An object which stores the controller's dynamic client quotas. + * This must be accessed only by the event queue thread. + */ + private final ClientQuotaControlManager clientQuotaControlManager; + + /** + * An object which stores the controller's view of the cluster. + * This must be accessed only by the event queue thread. + */ + private final ClusterControlManager clusterControl; + + /** + * An object which stores the controller's view of the cluster features. + * This must be accessed only by the event queue thread. + */ + private final FeatureControlManager featureControl; + + /** + * An object which stores the controller's view of topics and partitions. + * This must be accessed only by the event queue thread. + */ + private final ReplicationControlManager replicationControl; + + /** + * The interface that we use to mutate the Raft log. + */ + private final MetaLogManager logManager; + + /** + * The interface that receives callbacks from the Raft log. These callbacks are + * invoked from the Raft thread(s), not from the controller thread. + */ + private final QuorumMetaLogListener metaLogListener; + + /** + * If this controller is active, this is the non-negative controller epoch. + * Otherwise, this is -1. This variable must be modified only from the controller + * thread, but it can be read from other threads. + */ + private volatile long curClaimEpoch; + + /** + * The last offset we have committed, or -1 if we have not committed any offsets. + */ + private long lastCommittedOffset; + + /** + * If we have called scheduleWrite, this is the last offset we got back from it. + */ + private long writeOffset; + + private QuorumController(LogContext logContext, + int nodeId, + KafkaEventQueue queue, + Time time, + Map<ConfigResource.Type, ConfigDef> configDefs, + MetaLogManager logManager, + Map<String, VersionRange> supportedFeatures, + short defaultReplicationFactor, + int defaultNumPartitions, + ReplicaPlacementPolicy replicaPlacementPolicy, + long sessionTimeoutNs) throws Exception { + this.log = logContext.logger(QuorumController.class); + this.nodeId = nodeId; + this.queue = queue; + this.time = time; + this.snapshotRegistry = new SnapshotRegistry(logContext); + snapshotRegistry.createSnapshot(-1); + this.purgatory = new ControllerPurgatory(); + this.configurationControl = new ConfigurationControlManager(logContext, + snapshotRegistry, configDefs); + this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry); + this.clusterControl = new ClusterControlManager(logContext, time, + snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy); + this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); + this.replicationControl = new ReplicationControlManager(snapshotRegistry, + logContext, new Random(), defaultReplicationFactor, defaultNumPartitions, + configurationControl, clusterControl); + this.logManager = logManager; + this.metaLogListener = new QuorumMetaLogListener(); + this.curClaimEpoch = -1L; + this.lastCommittedOffset = -1L; + this.writeOffset = -1L; + this.logManager.register(metaLogListener); + } + + @Override + public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) { + return appendWriteEvent("alterIsr", () -> + replicationControl.alterIsr(request)); + } + + @Override + public CompletableFuture<CreateTopicsResponseData> + createTopics(CreateTopicsRequestData request) { + return appendWriteEvent("createTopics", () -> + replicationControl.createTopics(request)); + } + + @Override + public CompletableFuture<Void> decommissionBroker(int brokerId) { + return appendWriteEvent("decommissionBroker", () -> { + ControllerResult<Void> result = clusterControl.decommissionBroker(brokerId); + replicationControl.removeFromIsr(brokerId, result.records()); + return result; + }); + } + + @Override + public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> + describeConfigs(Map<ConfigResource, Collection<String>> resources) { + return appendReadEvent("describeConfigs", () -> + configurationControl.describeConfigs(lastCommittedOffset, resources)); + } + + @Override + public CompletableFuture<ElectLeadersResponseData> + electLeaders(ElectLeadersRequestData request) { + CompletableFuture<ElectLeadersResponseData> future = new CompletableFuture<>(); + appendWriteEvent("electLeaders", request.timeoutMs(), + () -> replicationControl.electLeaders(request)); + return future; + } + + @Override + public CompletableFuture<FeatureManager.FinalizedFeaturesAndEpoch> finalizedFeatures() { + return appendReadEvent("getFinalizedFeatures", + () -> featureControl.finalizedFeaturesAndEpoch(lastCommittedOffset)); + } + + @Override + public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs( + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges, + boolean validateOnly) { + return appendWriteEvent("incrementalAlterConfigs", () -> { + ControllerResult<Map<ConfigResource, ApiError>> result = + configurationControl.incrementalAlterConfigs(configChanges); + if (validateOnly) { + return result.withoutRecords(); + } else { + return result; + } + }); + } + + @Override + public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs( + Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) { + return appendWriteEvent("legacyAlterConfigs", () -> { + ControllerResult<Map<ConfigResource, ApiError>> result = + configurationControl.legacyAlterConfigs(newConfigs); + if (validateOnly) { + return result.withoutRecords(); + } else { + return result; + } + }); + } + + class ProcessBrokerHeartbeatOperation + implements ControllerWriteOperation<BrokerHeartbeatReply> { + private final BrokerHeartbeatRequestData request; + private boolean updateShutdownOffset = false; + + ProcessBrokerHeartbeatOperation(BrokerHeartbeatRequestData request) { + this.request = request; + } + + @Override + public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() { + clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); + List<ApiMessageAndVersion> records = new ArrayList<>(); + boolean movingLeadersForShutDown = false; + if (request.wantShutDown()) { + records = replicationControl.removeLeaderships(request.brokerId()); Review comment: > This call generates a leader change record before the following fencedBroker record. Ordering wise, it seems that we should record the fencedBroker first. Hmm... it should be OK to remove the leaderships first. Kip-500 Controlled shutdown works this way, for example... the shutting-down broker is not actually fenced at all until all the other brokers have removed it as a leader. Also, wouldn't it be a bit weird to be in a state where a broker is still marked as the leader for some partition, but doesn't show up in the list of brokers given in the MetadataResponse? That would happen if we put the fencing record first. I don't think clients or brokers would handle this well. > Also, I am wondering what's the best place to trigger leader election and removing from ISR due to fenced broker. There are multiple cases when a broker can be fenced (e.g. broker controlled shutdown, broker fenced due to no heartbeat). Instead of of doing leader election and isr removal in all those cases, another option is to tigger them in a single place when a fencedBroker record is replayed. Replaying a record cannot trigger the creation of any additional records. This would not work since the standby controllers can't create records, after all... only the active controller. I have created a `handleBrokerFenced` function in `ReplicationControlManager` which does most of what needs to be done for fencing, though... aside from creating the actual fencing record. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org