jsancio commented on a change in pull request #10497:
URL: https://github.com/apache/kafka/pull/10497#discussion_r621649450



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -126,10 +130,10 @@ class KafkaRaftManager[T](
   private val dataDir = createDataDir()
   private val metadataLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
-  private val raftClient = buildRaftClient()
-  private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+  val client: KafkaRaftClient[T] = buildRaftClient()
+  private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
 
-  def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+  def kafkaRaftClient: KafkaRaftClient[T] = client

Review comment:
       I think that since you added a new method `client: RaftClient[T]` to 
`RaftManager[T]` and `KafkaRaftManager` overrides it to `client: 
KafkaRaftClient[T]` we should be able to remove this `KafkaRaftManager[T]` only 
public method. 

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -126,10 +130,10 @@ class KafkaRaftManager[T](
   private val dataDir = createDataDir()
   private val metadataLog = buildMetadataLog()
   private val netChannel = buildNetworkChannel()
-  private val raftClient = buildRaftClient()
-  private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+  val client: KafkaRaftClient[T] = buildRaftClient()

Review comment:
       Did you mean to override the return type from `RaftClient[T]` to 
`KafkaRaftClient[T]`?

##########
File path: 
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
##########
@@ -108,18 +109,29 @@ class BrokerMetadataListenerTest {
     assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
   }
 
+  private def applyBatch(
+    records: List[ApiMessageAndVersion]
+  ): Unit = {
+    val baseOffset = lastMetadataOffset + 1

Review comment:
       Hmm. This is minor but those this mean that a `baseOffset` of 0 is not 
possible since `lastMetadataOffset` is initialized to `0`? Is this also true 
for a "regular" Kafka topic partition? Or is this just an side effect of how 
this test gets constructed.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader<T> reader) {
             listener.handleCommit(reader);
         }
 
-        void maybeFireHandleClaim(int epoch, long epochStartOffset) {
-            // We can fire `handleClaim` as soon as the listener has caught
-            // up to the start of the leader epoch. This guarantees that the
-            // state machine has seen the full committed state before it 
becomes
-            // leader and begins writing to the log.
-            if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
-                claimedEpoch = epoch;
-                listener.handleClaim(epoch);
+        void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+            if (shouldFireLeaderChange(leaderAndEpoch)) {
+                lastFiredLeaderChange = leaderAndEpoch;
+                listener.handleLeaderChange(leaderAndEpoch);
             }
         }
 
-        void fireHandleResign(int epoch) {
-            listener.handleResign(epoch);
+        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+            if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
+                return false;
+            } else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) {
+                return true;

Review comment:
       I see. We want to fire this event even if the `leader` is 
`Optional.empty()` because we use this event to propagate lost of leadership.

##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -331,22 +342,40 @@ public void beginShutdown() {
     }
 
     @Override
-    public void close() throws InterruptedException {
+    public void close() {
         log.debug("Node {}: closing.", nodeId);
         beginShutdown();
-        eventQueue.close();
+
+        try {
+            eventQueue.close();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();

Review comment:
       Can you explain why we are doing this?

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
##########
@@ -28,6 +28,10 @@ public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
         this.epoch = epoch;
     }
 
+    public boolean isLeader(int nodeId) {
+        return leaderId.isPresent() && leaderId.getAsInt() == nodeId;

Review comment:
       Minor but how about `return leaderId.equals(OptionalInt.of(nodeId));`

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2408,19 +2418,32 @@ private void fireHandleCommit(BatchReader<T> reader) {
             listener.handleCommit(reader);
         }
 
-        void maybeFireHandleClaim(int epoch, long epochStartOffset) {
-            // We can fire `handleClaim` as soon as the listener has caught
-            // up to the start of the leader epoch. This guarantees that the
-            // state machine has seen the full committed state before it 
becomes
-            // leader and begins writing to the log.
-            if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
-                claimedEpoch = epoch;
-                listener.handleClaim(epoch);
+        void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+            if (shouldFireLeaderChange(leaderAndEpoch)) {
+                lastFiredLeaderChange = leaderAndEpoch;
+                listener.handleLeaderChange(leaderAndEpoch);
             }
         }
 
-        void fireHandleResign(int epoch) {
-            listener.handleResign(epoch);
+        private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+            if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
+                return false;
+            } else if (leaderAndEpoch.epoch > lastFiredLeaderChange.epoch) {
+                return true;
+            } else {
+                return leaderAndEpoch.leaderId.isPresent() && 
!lastFiredLeaderChange.leaderId.isPresent();

Review comment:
       This works because there is an invariant that `leaderAndEpoch.epoch >= 
lastFiredLeaderChange.epoch` is always `true`, right? Should we document this 
above this line?
   
   Thinking about it some more, wouldn't this always be true since we know that:
   1. `leaderAndEpoch.epoch == lastFiredLeaderChange.epoch`
   2. `!leaderAndEpoch.equals(lastFiredLeaderChange)`
   
   If you agree, I think that we can change this method to just:
   ```
   return !leaderAndEpoch.equals(lastFiredLeaderChange);
   ```

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -200,7 +202,7 @@ public Builder setMetrics(ControllerMetrics 
controllerMetrics) {
 
         @SuppressWarnings("unchecked")
         public QuorumController build() throws Exception {
-            if (logManager == null) {
+            if (raftClient == null) {
                 throw new RuntimeException("You must set a metadata log 
manager.");

Review comment:
       Let's change the message for this exception. Maybe "Raft client was not 
set for the quorum controller"

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -672,19 +688,14 @@ public void handleNewLeader(MetaLogLeader newLeader) {
                     writeOffset = lastCommittedOffset;
                     clusterControl.activate();
                 });
-            }
-        }
-
-        @Override
-        public void handleRenounce(long oldEpoch) {
-            appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> {
-                if (curClaimEpoch == oldEpoch) {
+            } else if (curClaimEpoch != -1) {

Review comment:
       Is it safe to read `curClaimEpoch` and compare it against `-1` outside 
the controller thread? Why not always send a "handleRenounce" event? For 
example, I think the following trace/state is not possible and it should be:
   
   ```
   curClaimEpoch = -1
   queue = Queue("handleClaim[1]", "handleRenounce[1]")
   ```

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -557,9 +559,9 @@ public void run() throws Exception {
                 // out asynchronously.
                 final long offset;
                 if (result.isAtomic()) {
-                    offset = logManager.scheduleAtomicWrite(controllerEpoch, 
result.records());
+                    offset = raftClient.scheduleAtomicAppend(controllerEpoch, 
result.records());
                 } else {
-                    offset = logManager.scheduleWrite(controllerEpoch, 
result.records());
+                    offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());

Review comment:
       We have https://issues.apache.org/jira/browse/KAFKA-12158 for this issue.
   
   @hachikuji I think for now we should at least check the return value with 
`offset = Object.requireNonNull(...)`.

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -73,21 +76,33 @@ class BrokerMetadataListener(brokerId: Int,
   /**
    * Handle new metadata records.
    */
-  override def handleCommits(lastOffset: Long, records: 
util.List[ApiMessage]): Unit = {
-    eventQueue.append(new HandleCommitsEvent(lastOffset, records))
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = 
{
+    eventQueue.append(new HandleCommitsEvent(reader))
   }
 
-  // Visible for testing. It's useful to execute events synchronously
-  private[metadata] def execCommits(lastOffset: Long, records: 
util.List[ApiMessage]): Unit = {
-    new HandleCommitsEvent(lastOffset, records).run()
+  // Visible for testing. It's useful to execute events synchronously in order
+  // to make tests deterministic
+  private[metadata] def execCommits(batch: 
BatchReader.Batch[ApiMessageAndVersion]): Unit = {
+    new HandleCommitsEvent(BatchReader.singleton(batch)).run()
   }
 
-  class HandleCommitsEvent(lastOffset: Long,
-                           records: util.List[ApiMessage])
-      extends EventQueue.FailureLoggingEvent(log) {
+  class HandleCommitsEvent(
+    reader: BatchReader[ApiMessageAndVersion]
+  ) extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
+      try {
+        apply(reader.next())

Review comment:
       I think this is technically correct based on the current implementation 
of `KafkaRaftClient` but is there a reason why the listener is only reading one 
batch? Also, should it assume that the reader contains at least one batch?
   
   When `handleCommit` is fired because of an `appendAsLeader` the reader is 
guarantee to have one batch. When reading from the `ReplicatedLog` the batch 
reader may have more than one batch.
   
   Should this method instead do the following?
   ```scala
   override def run(): Unit = {
     try {
       while(reader.hasNext()) {
         apply(reader.next())
       }
     } finally {
       reader.close()
     }
   }

##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -94,38 +94,14 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> 
reader) {
         }
 
         @Override
-        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
-            appendEvent("handleCommits", () -> {
-                log.debug("handleCommits " + messages + " at offset " + 
lastOffset);
-                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
-                dir.create("offset").setContents(String.valueOf(lastOffset));
-                for (ApiMessage message : messages) {
-                    handleMessage(message);
-                }
-            }, null);
-        }
-
-        @Override
-        public void handleNewLeader(MetaLogLeader leader) {
+        public void handleLeaderChange(LeaderAndEpoch leader) {
             appendEvent("handleNewLeader", () -> {

Review comment:
       This is probably outside the scope of this PR but it looks like `queue` 
is never read.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -45,24 +46,14 @@
         void handleCommit(BatchReader<T> reader);
 
         /**
-         * Invoked after this node has become a leader. This is only called 
after
-         * all commits up to the start of the leader's epoch have been sent to
-         * {@link #handleCommit(BatchReader)}.
+         * Called on any change to leadership. This includes both when a 
leader is elected and
+         * when a leader steps down or fails.
          *
-         * After becoming a leader, the client is eligible to write to the log
-         * using {@link #scheduleAppend(int, List)} or {@link 
#scheduleAtomicAppend(int, List)}.
-         *
-         * @param epoch the claimed leader epoch
+         * @param leader the current leader and epoch
          */
-        default void handleClaim(int epoch) {}
+        default void handleLeaderChange(LeaderAndEpoch leader) {}
 
-        /**
-         * Invoked after a leader has stepped down. This callback may or may 
not
-         * fire before the next leader has been elected.
-         *
-         * @param epoch the epoch that the leader is resigning from
-         */
-        default void handleResign(int epoch) {}
+        default void beginShutdown() {}

Review comment:
       Let's document this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to