cmccabe commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r633900447



##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -234,22 +259,20 @@ class BrokerMetadataListener(brokerId: Int,
     clientQuotaManager.handleQuotaRecord(record)
   }
 
-  class HandleNewLeaderEvent(leader: MetaLogLeader)
+  class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
       extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
       val imageBuilder =
         MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
-      if (leader.nodeId() < 0) {
-        imageBuilder.controllerId(None)
-      } else {
-        imageBuilder.controllerId(Some(leader.nodeId()))
-      }
+      imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
       metadataCache.image(imageBuilder.build())
     }
   }
 
-  override def handleNewLeader(leader: MetaLogLeader): Unit = {
-    eventQueue.append(new HandleNewLeaderEvent(leader))
+  override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
+    if (leader.isLeader(brokerId)) {
+      eventQueue.append(new HandleNewLeaderEvent(leader))

Review comment:
       This doesn't seem correct.... We need to know the controller ID even if 
it's not the same as our broker ID.
   
   (It's not even clear that we will always have a broker with the same ID as 
any controller, since their ID space is separate.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to