mumrah commented on a change in pull request #10039:
URL: https://github.com/apache/kafka/pull/10039#discussion_r571268386



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+    
startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))

Review comment:
       We are able to load the topic configs right away because they are coming 
from ZK, right?
   
   

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {
+    
startupWithTopicLogConfigOverrides(generateTopicLogConfigs(retrieveTopicNames))
+  }
+
+  // visible for testing
+  private[log] def generateTopicLogConfigs(topicNames: Set[String]): 
Map[String, LogConfig] = {
+    val topicLogConfigs: mutable.Map[String, LogConfig] = mutable.Map()

Review comment:
       nit: can move type info to right hand side and just have `val 
topicLogConfigs = ...`

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -403,7 +401,30 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(retrieveTopicNames: => Set[String]): Unit = {

Review comment:
       Feels slightly odd to pass in the set of topics to load here, but I 
can't think of a good way to avoid it. Perhaps we could pass MetadataCache into 
LogManager and let startup call MetadataCache#getAllTopics? That might be more 
risky though since it changes the startup order in KafkaServer, maybe we can 
look into this as a follow-up.
   
   Besides that, the name here seems strange. Maybe something like 
"topicsToLoad"?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -352,13 +348,15 @@ class LogManager(logDirs: Seq[File],
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
-        val jobsForDir = logsToLoad.map { logDir =>
+        val jobsForDir = logsToLoad
+          .filter(logDir => Log.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)

Review comment:
       Will the metadata topic get passed into LogManager? I would guess not 
since it's a Raft topic and not a regular Kafka topic. 
   
   Also style nit: you can do `logsToLoad.filter { logDir => ... }` similar to 
the `map` below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to