cmccabe commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042763929


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager 
broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val requestThread = newRequestThread
+  @volatile private var isZkControllerThread = true
+  @volatile private var requestThread = newRequestThread()
+  @volatile private var shutdownStarted = false
 
   def start(): Unit = {
     requestThread.start()
+    maybeScheduleReinitializeRequestThread()
   }
 
   def shutdown(): Unit = {
     requestThread.shutdown()
+    shutdownStarted = false
     info(s"Broker to controller channel manager for $channelName shutdown")
   }
 
-  private[server] def newRequestThread = {
+  def maybeScheduleReinitializeRequestThread(): Unit = {
+    // If migration is enabled for zkBroker, then we might see controller 
change from zk to kraft
+    // and vice-versa. This periodic task takes care of setting the right 
channel when such
+    // controller change is noticed.
+    if (config.migrationEnabled && config.requiresZookeeper) {

Review Comment:
   Shouldn't this just be something we check in the `InterBrokerSendThread` 
itself? Having a separate thread to do this seems very messy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to