cmccabe commented on code in PR #12961: URL: https://github.com/apache/kafka/pull/12961#discussion_r1042763929
########## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ########## @@ -164,25 +184,63 @@ class BrokerToControllerChannelManagerImpl( private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ") private val manualMetadataUpdater = new ManualMetadataUpdater() private val apiVersions = new ApiVersions() - private val requestThread = newRequestThread + @volatile private var isZkControllerThread = true + @volatile private var requestThread = newRequestThread() + @volatile private var shutdownStarted = false def start(): Unit = { requestThread.start() + maybeScheduleReinitializeRequestThread() } def shutdown(): Unit = { requestThread.shutdown() + shutdownStarted = false info(s"Broker to controller channel manager for $channelName shutdown") } - private[server] def newRequestThread = { + def maybeScheduleReinitializeRequestThread(): Unit = { + // If migration is enabled for zkBroker, then we might see controller change from zk to kraft + // and vice-versa. This periodic task takes care of setting the right channel when such + // controller change is noticed. + if (config.migrationEnabled && config.requiresZookeeper) { Review Comment: Shouldn't this just be something we check in the `InterBrokerSendThread` itself? Having a separate thread to do this seems very messy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org