mumrah commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1046431404


##########
clients/src/main/resources/common/message/StopReplicaRequest.json:
##########
@@ -31,7 +31,7 @@
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The controller id." },
-    { "name": "KRaftControllerId", "type": "int32", "versions": "4+", 
"entityType": "brokerId", "default": "-1",
+    { "name": "KRaftControllerId", "type": "int32", "versions": "4+", 
"default": "-1", "entityType": "brokerId", "default": "-1",

Review Comment:
   Same here



##########
clients/src/main/resources/common/message/LeaderAndIsrRequest.json:
##########
@@ -36,7 +36,7 @@
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The current controller ID." },
-    { "name": "KRaftControllerId", "type": "int32", "versions": "7+", 
"entityType": "brokerId", "default": "-1",
+    { "name": "KRaftControllerId", "type": "int32", "versions": "7+", 
"default": "-1", "entityType": "brokerId", "default": "-1",

Review Comment:
   You've got "default" twice here



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -176,13 +190,13 @@ class BrokerToControllerChannelManagerImpl(
   }
 
   private[server] def newRequestThread = {
-    val networkClient = {
+    def networkClient(controllerInfo: ControllerInformation) = {

Review Comment:
   Can we name this "buildNetworkClient" or "createNetworkClient" to make it 
clear we are actually making a new one?



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem(
 )
 
 class BrokerToControllerRequestThread(
-  networkClient: KafkaClient,
+  networkClientFactory: ControllerInformation => KafkaClient,
   metadataUpdater: ManualMetadataUpdater,
   controllerNodeProvider: ControllerNodeProvider,
   config: KafkaConfig,
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, 
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, 
Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, 
isInterruptible = false) {
+
+  var isZkController = false

Review Comment:
   Let's pick a more descriptive name, maybe "isNetworkClientForZkController" 
or something, or we could cache the ControllerInformation instead of just this 
boolean



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1318,14 +1317,20 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     trace("Sending topic metadata %s and brokers %s for correlation id %d to 
client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, 
request.header.clientId))
+    val controllerId = {
+      metadataCache.getControllerId.flatMap {
+        case ZkCachedControllerId(id) => Some(id)
+        case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId

Review Comment:
   See my note in MetadataCache. We could simplify this to 
`metadataCache.getControllerNode.map(_.id)`



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -32,6 +32,13 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, 
Short], epoch: Long)
   }
 }
 
+sealed trait CachedControllerId {

Review Comment:
   A short javadoc might be useful here.



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -147,6 +147,7 @@ class KafkaServer(
 
   var kafkaScheduler: KafkaScheduler = _
 
+  var kraftControllerNodes: Seq[Node] = Seq.empty

Review Comment:
   Let's just initialize this to `_` like the other class members here. It's 
not pretty, but it's better to be consistent 



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -103,13 +110,18 @@ trait MetadataCache {
   def metadataVersion(): MetadataVersion
 
   def features(): FinalizedFeaturesAndEpoch
+
+  def getRandomAliveBrokerId: Option[Int]
 }
 
 object MetadataCache {
   def zkMetadataCache(brokerId: Int,
                       metadataVersion: MetadataVersion,
-                      brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty()): ZkMetadataCache = {
-    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
+                      brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty(),
+                      kraftControllerNodes: collection.Seq[Node] = null)

Review Comment:
   Can we initialize this as an empty Seq instead of null?



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -248,7 +258,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
   }
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
-    
metadataSnapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
+    val snapshot = metadataSnapshot
+    brokerId match {
+      case id if 
snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id
 == id) =>
+        kraftControllerNodeMap.get(id)
+      case _ => 
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
+    }

Review Comment:
   We should be able to use pattern matching a little more cleanly like:
   
   ```suggestion
       val snapshot = metadataSnapshot
       snapshot.controllerId match {
         case Some(controllerId: KRaftCachedControllerId) => 
kraftControllerNodeMap.get(controllerId.id)
         case _ => 
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
       }
   ```
   
   



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -668,7 +684,7 @@ class KafkaServer(
 
           // 1. Find the controller and establish a connection to it.
           // If the controller id or the broker registration are missing, we 
sleep and retry (if there are remaining retries)
-          metadataCache.getControllerId match {
+          
metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id)
 match {

Review Comment:
   If we leave this as `metadataCache.getControllerId match`, we can just add 
the type to the cases below. E.g., 
   ```
   case Some(controllerId: ZkCachedControllerId) => ...
   ```



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -103,13 +110,18 @@ trait MetadataCache {
   def metadataVersion(): MetadataVersion
 
   def features(): FinalizedFeaturesAndEpoch
+
+  def getRandomAliveBrokerId: Option[Int]

Review Comment:
   Would it make sense to add `def getControllerNode: Option[Node]` here? It 
might allow for less complex usage outside of this class (e.g., maybe we can 
avoid checking kraft/vs in some cases?)



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem(
 )
 
 class BrokerToControllerRequestThread(
-  networkClient: KafkaClient,
+  networkClientFactory: ControllerInformation => KafkaClient,
   metadataUpdater: ManualMetadataUpdater,
   controllerNodeProvider: ControllerNodeProvider,
   config: KafkaConfig,
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, 
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, 
Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, 
isInterruptible = false) {

Review Comment:
   I wonder if we could do this slightly differently. Instead of passing a 
`null` and using the `maybeResetNetworkClient` call in the class body, could we 
make a companion object that does the initialization and passes in an 
initialized NetworkClient? 
   
   e.g., 
   ```scala
   class BrokerToControllerRequestThread(
      initialNetworkClient: KafkaClient,
      networkClientFactory: ControllerInformation => KafkaClient,
      metadataUpdater: ManualMetadataUpdater,
      controllerNodeProvider: ControllerNodeProvider,
      config: KafkaConfig,
      time: Time,
      threadName: String,
      retryTimeoutMs: Long
   ) extends InterBrokerSendThread(threadName, initialNetworkClient, 
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, time, isInterruptible = false) {
   ```
   
   This way we never have a `null` reference for 
InterBrokerSendThread#networkClient. 



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -60,16 +60,24 @@ trait ZkFinalizedFeatureCache {
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, 
brokerFeatures: BrokerFeatures)
+class ZkMetadataCache(
+  brokerId: Int,
+  metadataVersion: MetadataVersion,
+  brokerFeatures: BrokerFeatures,
+  kraftControllerNodes: Seq[Node] = Seq.empty)

Review Comment:
   Do we need the default value here? (maybe it's there just for tests?)



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -37,42 +38,55 @@ import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
-trait ControllerNodeProvider {
-  def get(): Option[Node]
-  def listenerName: ListenerName
-  def securityProtocol: SecurityProtocol
-  def saslMechanism: String
-}
+case class ControllerInformation(node: Option[Node],
+                                 listenerName: ListenerName,
+                                 securityProtocol: SecurityProtocol,
+                                 saslMechanism: String,
+                                 isZkController: Boolean)

Review Comment:
   nit: prefer the new continuation style:
   
   ```
   class Foo(
     arg1: T,
     arg2: T
   )
   ```



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -364,11 +399,13 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {

Review Comment:
   Previously, we would only check ControllerNodeProvider for new connection 
information if we received NOT_CONTROLLER or got disconnected. Can we keep with 
this paradigm and only check ControllerNodeProvider in these cases?
   
   If the controller has moved from ZK to ZK, or ZK to KRaft, we'll likely 
learn about it first via NOT_CONTROLLER anyways. 
   
   My main concern is not changing too much behavior in this class. Ideally, in 
non-migration ZK mode, things here will behave exactly the same as they are 
today. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to