hachikuji commented on a change in pull request #10135:
URL: https://github.com/apache/kafka/pull/10135#discussion_r592887105



##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -72,11 +54,9 @@ class ForwardingManagerImpl(
 ) extends ForwardingManager with Logging {
 
   override def start(): Unit = {

Review comment:
       Similarly, we don't need these if they are not doing anything.

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -101,11 +82,9 @@ class DefaultAutoTopicCreationManager(
   private val inflightTopics = Collections.newSetFromMap(new 
ConcurrentHashMap[String, java.lang.Boolean]())
 
   override def start(): Unit = {

Review comment:
       It seems like we don't have a strong need for this anymore. We could 
probably get rid of `shutdown` as well since there's probably not a strong 
reason to clear the inflight topics if the manager won't be reused anyway.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -256,13 +258,22 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new 
CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
+        /* start forwarding manager */
         if (enableForwarding) {
+          clientToControllerChannelManager =
+            Some(
+              BrokerToControllerChannelManager(
+                controllerNodeProvider = 
MetadataCacheControllerNodeProvider(config, metadataCache),
+                time = time,
+                metrics = metrics,
+                config = config,
+                channelName = "clientToControllerChannel",
+                threadNamePrefix = threadNamePrefix,
+                retryTimeoutMs = config.requestTimeoutMs.longValue)
+            )
+          clientToControllerChannelManager.get.start()
           this.forwardingManager = Some(ForwardingManager(
-            config,
-            metadataCache,
-            time,
-            metrics,
-            threadNamePrefix
+            clientToControllerChannelManager.get

Review comment:
       nit: this `get` here is a little ugly. We could create a local `val` 
when we construct `BrokerToControllerChannelManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to