cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577125115



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.QuotaRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureManager;
+import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+
+public final class QuorumController implements Controller {
+    /**
+     * A builder class which creates the QuorumController.
+     */
+    static public class Builder {
+        private final int nodeId;
+        private Time time = Time.SYSTEM;
+        private String threadNamePrefix = null;
+        private LogContext logContext = null;
+        private Map<ConfigResource.Type, ConfigDef> configDefs = 
Collections.emptyMap();
+        private MetaLogManager logManager = null;
+        private Map<String, VersionRange> supportedFeatures = 
Collections.emptyMap();
+        private short defaultReplicationFactor = 3;
+        private int defaultNumPartitions = 1;
+        private ReplicaPlacementPolicy replicaPlacementPolicy =
+            new SimpleReplicaPlacementPolicy(new Random());
+        private long sessionTimeoutNs = TimeUnit.NANOSECONDS.convert(18, 
TimeUnit.SECONDS);
+
+        public Builder(int nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public Builder setThreadNamePrefix(String threadNamePrefix) {
+            this.threadNamePrefix = threadNamePrefix;
+            return this;
+        }
+
+        public Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        public Builder setConfigDefs(Map<ConfigResource.Type, ConfigDef> 
configDefs) {
+            this.configDefs = configDefs;
+            return this;
+        }
+
+        public Builder setLogManager(MetaLogManager logManager) {
+            this.logManager = logManager;
+            return this;
+        }
+
+        public Builder setSupportedFeatures(Map<String, VersionRange> 
supportedFeatures) {
+            this.supportedFeatures = supportedFeatures;
+            return this;
+        }
+
+        public Builder setDefaultReplicationFactor(short 
defaultReplicationFactor) {
+            this.defaultReplicationFactor = defaultReplicationFactor;
+            return this;
+        }
+
+        public Builder setDefaultNumPartitions(int defaultNumPartitions) {
+            this.defaultNumPartitions = defaultNumPartitions;
+            return this;
+        }
+
+        public Builder setReplicaPlacementPolicy(ReplicaPlacementPolicy 
replicaPlacementPolicy) {
+            this.replicaPlacementPolicy = replicaPlacementPolicy;
+            return this;
+        }
+
+        public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
+            this.sessionTimeoutNs = sessionTimeoutNs;
+            return this;
+        }
+
+        public QuorumController build() throws Exception {
+            if (logManager == null) {
+                throw new RuntimeException("You must set a metadata log 
manager.");
+            }
+            if (threadNamePrefix == null) {
+                threadNamePrefix = String.format("Node%d_", nodeId);
+            }
+            if (logContext == null) {
+                logContext = new LogContext(String.format("[Controller %d] ", 
nodeId));
+            }
+            KafkaEventQueue queue = null;
+            try {
+                queue = new KafkaEventQueue(time, logContext, 
threadNamePrefix);
+                return new QuorumController(logContext, nodeId, queue, time, 
configDefs,
+                        logManager, supportedFeatures, 
defaultReplicationFactor,
+                        defaultNumPartitions, replicaPlacementPolicy, 
sessionTimeoutNs);
+            } catch (Exception e) {
+                Utils.closeQuietly(queue, "event queue");
+                throw e;
+            }
+        }
+    }
+
+    private static final String ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX =
+        "The active controller appears to be node ";
+
+    private NotControllerException newNotControllerException() {
+        int latestController = logManager.leader().nodeId();
+        if (latestController < 0) {
+            return new NotControllerException("No controller appears to be 
active.");
+        } else {
+            return new 
NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
+                latestController);
+        }
+    }
+
+    public static int exceptionToApparentController(NotControllerException e) {
+        if 
(e.getMessage().startsWith(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX)) {
+            return Integer.parseInt(e.getMessage().substring(
+                ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX.length()));
+        } else {
+            return -1;
+        }
+    }
+
+    private void handleEventEnd(String name, long startProcessingTimeNs) {
+        long endProcessingTime = time.nanoseconds();
+        long deltaNs = endProcessingTime - startProcessingTimeNs;
+        log.debug("Processed {} in {} us", name,
+            TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS));
+    }
+
+    private Throwable handleEventException(String name,
+                                           Optional<Long> 
startProcessingTimeNs,
+                                           Throwable exception) {
+        if (!startProcessingTimeNs.isPresent()) {
+            log.debug("unable to start processing {} because of {}.", name,
+                exception.getClass().getSimpleName());
+            if (exception instanceof ApiException) {
+                return exception;
+            } else {
+                return new UnknownServerException(exception);
+            }
+        }
+        long endProcessingTime = time.nanoseconds();
+        long deltaNs = endProcessingTime - startProcessingTimeNs.get();
+        long deltaUs = TimeUnit.MICROSECONDS.convert(deltaNs, 
TimeUnit.NANOSECONDS);
+        if (exception instanceof ApiException) {
+            log.debug("{}: failed with {} in {} us", name,
+                exception.getClass().getSimpleName(), deltaUs);
+            return exception;
+        }
+        log.warn("{}: failed with unknown server exception {} at epoch {} in 
{} us.  " +
+            "Reverting to last committed offset {}.",
+            this, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
+            lastCommittedOffset, exception);
+        renounce();
+        return new UnknownServerException(exception);
+    }
+
+    /**
+     * A controller event for handling internal state changes, such as Raft 
inputs.
+     */
+    class ControlEvent implements EventQueue.Event {
+        private final String name;
+        private final Runnable handler;
+        private Optional<Long> startProcessingTimeNs = Optional.empty();
+
+        ControlEvent(String name, Runnable handler) {
+            this.name = name;
+            this.handler = handler;
+        }
+
+        @Override
+        public void run() throws Exception {
+            startProcessingTimeNs = Optional.of(time.nanoseconds());
+            log.debug("Executing {}.", this);
+            handler.run();
+            handleEventEnd(this.toString(), startProcessingTimeNs.get());
+        }
+
+        @Override
+        public void handleException(Throwable exception) {
+            handleEventException(name, startProcessingTimeNs, exception);
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    private void appendControlEvent(String name, Runnable handler) {
+        ControlEvent event = new ControlEvent(name, handler);
+        queue.append(event);
+    }
+
+    /**
+     * A controller event that reads the committed internal state in order to 
expose it
+     * to an API.
+     */
+    class ControllerReadEvent<T> implements EventQueue.Event {
+        private final String name;
+        private final CompletableFuture<T> future;
+        private final Supplier<T> handler;
+        private Optional<Long> startProcessingTimeNs = Optional.empty();
+
+        ControllerReadEvent(String name, Supplier<T> handler) {
+            this.name = name;
+            this.future = new CompletableFuture<T>();
+            this.handler = handler;
+        }
+
+        CompletableFuture<T> future() {
+            return future;
+        }
+
+        @Override
+        public void run() throws Exception {
+            startProcessingTimeNs = Optional.of(time.nanoseconds());
+            T value = handler.get();
+            handleEventEnd(this.toString(), startProcessingTimeNs.get());
+            future.complete(value);
+        }
+
+        @Override
+        public void handleException(Throwable exception) {
+            future.completeExceptionally(
+                handleEventException(name, startProcessingTimeNs, exception));
+        }
+
+        @Override
+        public String toString() {
+            return name + "(" + System.identityHashCode(this) + ")";
+        }
+    }
+
+    // VisibleForTesting
+    ReplicationControlManager replicationControl() {
+        return replicationControl;
+    }
+
+    // VisibleForTesting
+    <T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) 
{
+        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, 
handler);
+        queue.append(event);
+        return event.future();
+    }
+
+    interface ControllerWriteOperation<T> {
+        /**
+         * Generate the metadata records needed to implement this controller 
write
+         * operation.  In general, this operation should not modify the "hard 
state" of
+         * the controller.  That modification will happen later on, when we 
replay the
+         * records generated by this function.
+         *
+         * There are cases where this function modifies the "soft state" of the
+         * controller.  Mainly, this happens when we process cluster 
heartbeats.
+         *
+         * This function also generates an RPC result.  In general, if the RPC 
resulted in
+         * an error, the RPC result will be an error, and the generated record 
list will
+         * be empty.  This would happen if we tried to create a topic with 
incorrect
+         * parameters, for example.  Of course, partial errors are possible 
for batch
+         * operations.
+         *
+         * @return              A tuple containing a list of records, and an 
RPC result.
+         */
+        ControllerResult<T> generateRecordsAndResult() throws Exception;
+
+        /**
+         * Once we've passed the records to the Raft layer, we will invoke 
this function
+         * with the end offset at which those records were placed.  If there 
were no
+         * records to write, we'll just pass the last write offset.
+         */
+        default void processBatchEndOffset(long offset) {}
+    }
+
+    /**
+     * A controller event that modifies the controller state.
+     */
+    class ControllerWriteEvent<T> implements EventQueue.Event, DeferredEvent {
+        private final String name;
+        private final CompletableFuture<T> future;
+        private final ControllerWriteOperation<T> op;
+        private Optional<Long> startProcessingTimeNs = Optional.empty();
+        private ControllerResultAndOffset<T> resultAndOffset;
+
+        ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
+            this.name = name;
+            this.future = new CompletableFuture<T>();
+            this.op = op;
+            this.resultAndOffset = null;
+        }
+
+        CompletableFuture<T> future() {
+            return future;
+        }
+
+        @Override
+        public void run() throws Exception {
+            long controllerEpoch = curClaimEpoch;
+            if (controllerEpoch == -1) {
+                throw newNotControllerException();
+            }
+            startProcessingTimeNs = Optional.of(time.nanoseconds());
+            ControllerResult<T> result = op.generateRecordsAndResult();
+            if (result.records().isEmpty()) {
+                op.processBatchEndOffset(writeOffset);
+                // If the operation did not return any records, then it was 
actually just
+                //a read after all, and not a read + write.  However, this 
read was done
+                //from the latest in-memory state, which might contain 
uncommitted data.
+                Optional<Long> maybeOffset = purgatory.highestPendingOffset();
+                if (!maybeOffset.isPresent()) {
+                    // If the purgatory is empty, there are no pending 
operations and no
+                    // uncommitted state.  We can return immediately.
+                    this.resultAndOffset = new ControllerResultAndOffset<>(-1,
+                        new ArrayList<>(), result.response());
+                    log.debug("Completing read-only operation {} immediately 
because " +
+                        "the purgatory is empty.", this);
+                    complete(null);
+                    return;
+                }
+                // If there are operations in the purgatory, we want to wait 
for the latest
+                // one to complete before returning our result to the user.
+                this.resultAndOffset = new 
ControllerResultAndOffset<>(maybeOffset.get(),
+                    result.records(), result.response());
+                log.debug("Read-only operation {} will be completed when the 
log " +
+                    "reaches offset {}", this, resultAndOffset.offset());
+            } else {
+                // If the operation returned a batch of records, those records 
need to be
+                // written before we can return our result to the user.  Here, 
we hand off
+                // the batch of records to the metadata log manager.  They 
will be written
+                // out asynchronously.
+                long offset = logManager.scheduleWrite(controllerEpoch, 
result.records());
+                op.processBatchEndOffset(offset);
+                writeOffset = offset;
+                this.resultAndOffset = new ControllerResultAndOffset<>(offset,
+                    result.records(), result.response());
+                for (ApiMessageAndVersion message : result.records()) {
+                    replay(message.message());
+                }
+                snapshotRegistry.createSnapshot(offset);
+                log.debug("Read-write operation {} will be completed when the 
log " +
+                    "reaches offset {}.", this, resultAndOffset.offset());
+            }
+            purgatory.add(resultAndOffset.offset(), this);
+        }
+
+        @Override
+        public void handleException(Throwable exception) {
+            complete(exception);
+        }
+
+        @Override
+        public void complete(Throwable exception) {
+            if (exception == null) {
+                handleEventEnd(this.toString(), startProcessingTimeNs.get());
+                future.complete(resultAndOffset.response());
+            } else {
+                future.completeExceptionally(
+                    handleEventException(name, startProcessingTimeNs, 
exception));
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name + "(" + System.identityHashCode(this) + ")";
+        }
+    }
+
+    private <T> CompletableFuture<T> appendWriteEvent(String name,
+                                                      long timeoutMs,
+                                                      
ControllerWriteOperation<T> op) {
+        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
+        queue.appendWithDeadline(time.nanoseconds() +
+            TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS), 
event);
+        return event.future();
+    }
+
+    private <T> CompletableFuture<T> appendWriteEvent(String name,
+                                                      
ControllerWriteOperation<T> op) {
+        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
+        queue.append(event);
+        return event.future();
+    }
+
+    class QuorumMetaLogListener implements MetaLogListener {
+        @Override
+        public void handleCommits(long offset, List<ApiMessage> messages) {
+            appendControlEvent("handleCommits[" + offset + "]", () -> {
+                if (curClaimEpoch == -1) {
+                    if (log.isDebugEnabled()) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("Replaying commits from the active node 
up to " +
+                                "offset {}: {}.", offset, messages.stream().
+                                map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+                        } else {
+                            log.debug("Replaying commits from the active node 
up to " +
+                                "offset {}.", offset);
+                        }
+                    }
+                    for (ApiMessage message : messages) {
+                        replay(message);

Review comment:
       This code is only executed by the followers.  It is true that the leader 
already applied these log messages but these nodes are not the leader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to