hachikuji commented on a change in pull request #10135: URL: https://github.com/apache/kafka/pull/10135#discussion_r592887105
########## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ########## @@ -72,11 +54,9 @@ class ForwardingManagerImpl( ) extends ForwardingManager with Logging { override def start(): Unit = { Review comment: Similarly, we don't need these if they are not doing anything. ########## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ########## @@ -101,11 +82,9 @@ class DefaultAutoTopicCreationManager( private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) override def start(): Unit = { Review comment: It seems like we don't have a strong need for this anymore. We could probably get rid of `shutdown` as well since there's probably not a strong reason to clear the inflight topics if the manager won't be reused anyway. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -256,13 +258,22 @@ class KafkaServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + /* start forwarding manager */ if (enableForwarding) { + clientToControllerChannelManager = + Some( + BrokerToControllerChannelManager( + controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache), + time = time, + metrics = metrics, + config = config, + channelName = "clientToControllerChannel", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = config.requestTimeoutMs.longValue) + ) + clientToControllerChannelManager.get.start() this.forwardingManager = Some(ForwardingManager( - config, - metadataCache, - time, - metrics, - threadNamePrefix + clientToControllerChannelManager.get Review comment: nit: this `get` here is a little ugly. We could create a local `val` when we construct `BrokerToControllerChannelManager`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org