soarez commented on code in PR #14578:
URL: https://github.com/apache/kafka/pull/14578#discussion_r1371577705


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File],
   def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
 
   /**
-   * Determine directory ID for each directory with a meta.properties.
+   * Determine directory ID for each directory.
    * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
+   * Directories without a meta.properties file, which can only occur in Zk 
mode, will file have a directory ID assigned.
+   * The ID will be written when the new meta.properties file is first written.
    */
   private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+    val s = scala.collection.mutable.Set[Uuid]()
     dirs.flatMap { dir =>
       try {
         val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-        metadataCheckpoint.read().map { props =>
-          val rawMetaProperties = new RawMetaProperties(props)
-          val uuid = rawMetaProperties.directoryId match {
-            case Some(uuidStr) => Uuid.fromString(uuidStr)
-            case None =>
-              val uuid = Uuid.randomUuid()
-              rawMetaProperties.directoryId = uuid.toString
-              metadataCheckpoint.write(rawMetaProperties.props)
-              uuid
+        val uuid = metadataCheckpoint.read() match {
+          case Some(props) => {
+            val rawMetaProperties = new RawMetaProperties(props)
+            val uuid_from_properties = rawMetaProperties.directoryId match {
+              case Some(uuidStr) => Uuid.fromString(uuidStr)
+              case None =>
+                val uuid_new = Uuid.randomUuid()
+                rawMetaProperties.directoryId = uuid_new.toString
+                metadataCheckpoint.write(rawMetaProperties.props)
+                uuid_new
+            }
+            uuid_from_properties
+          }
+          case None => {
+            Uuid.randomUuid()
           }
-          dir.getAbsolutePath -> uuid
-        }.toMap
+        }
+        if (s.contains(uuid)) {
+            throw new RuntimeException(s"Found duplicate directory.ids 
${uuid.toString}")
+        }

Review Comment:
   An alternative to maintaining a mutable set and modifying it from within the 
flatMap lamda could be to compare the size of the result with `.toSet.size` or 
`.toSeq.distinct.size` e.g.
   
   ```scala
   if (dirIds.values.toSet.size != dirIds.values.size)
     throw new RuntimeException(s"Found duplicate directory.ids 
${uuid.toString}")
   ```



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File],
   def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
 
   /**
-   * Determine directory ID for each directory with a meta.properties.
+   * Determine directory ID for each directory.
    * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
+   * Directories without a meta.properties file, which can only occur in Zk 
mode, will file have a directory ID assigned.
+   * The ID will be written when the new meta.properties file is first written.
    */
   private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+    val s = scala.collection.mutable.Set[Uuid]()
     dirs.flatMap { dir =>
       try {
         val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-        metadataCheckpoint.read().map { props =>
-          val rawMetaProperties = new RawMetaProperties(props)
-          val uuid = rawMetaProperties.directoryId match {
-            case Some(uuidStr) => Uuid.fromString(uuidStr)
-            case None =>
-              val uuid = Uuid.randomUuid()
-              rawMetaProperties.directoryId = uuid.toString
-              metadataCheckpoint.write(rawMetaProperties.props)
-              uuid
+        val uuid = metadataCheckpoint.read() match {
+          case Some(props) => {
+            val rawMetaProperties = new RawMetaProperties(props)
+            val uuid_from_properties = rawMetaProperties.directoryId match {
+              case Some(uuidStr) => Uuid.fromString(uuidStr)
+              case None =>
+                val uuid_new = Uuid.randomUuid()
+                rawMetaProperties.directoryId = uuid_new.toString
+                metadataCheckpoint.write(rawMetaProperties.props)
+                uuid_new
+            }
+            uuid_from_properties
+          }
+          case None => {
+            Uuid.randomUuid()
           }
-          dir.getAbsolutePath -> uuid
-        }.toMap
+        }
+        if (s.contains(uuid)) {
+            throw new RuntimeException(s"Found duplicate directory.ids 
${uuid.toString}")
+        }

Review Comment:
   Perhaps `KafkaException` is more suitable than `RuntimeException`?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File],
   def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
 
   /**
-   * Determine directory ID for each directory with a meta.properties.
+   * Determine directory ID for each directory.
    * If meta.properties does not include a directory ID, one is generated and 
persisted back to meta.properties.
-   * Directories without a meta.properties don't get a directory ID assigned.
+   * Directories without a meta.properties file, which can only occur in Zk 
mode, will file have a directory ID assigned.
+   * The ID will be written when the new meta.properties file is first written.
    */
   private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
+    val s = scala.collection.mutable.Set[Uuid]()
     dirs.flatMap { dir =>
       try {
         val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, 
KafkaServer.brokerMetaPropsFile))
-        metadataCheckpoint.read().map { props =>
-          val rawMetaProperties = new RawMetaProperties(props)
-          val uuid = rawMetaProperties.directoryId match {
-            case Some(uuidStr) => Uuid.fromString(uuidStr)
-            case None =>
-              val uuid = Uuid.randomUuid()
-              rawMetaProperties.directoryId = uuid.toString
-              metadataCheckpoint.write(rawMetaProperties.props)
-              uuid
+        val uuid = metadataCheckpoint.read() match {
+          case Some(props) => {
+            val rawMetaProperties = new RawMetaProperties(props)
+            val uuid_from_properties = rawMetaProperties.directoryId match {
+              case Some(uuidStr) => Uuid.fromString(uuidStr)
+              case None =>
+                val uuid_new = Uuid.randomUuid()
+                rawMetaProperties.directoryId = uuid_new.toString
+                metadataCheckpoint.write(rawMetaProperties.props)
+                uuid_new
+            }
+            uuid_from_properties
+          }
+          case None => {
+            Uuid.randomUuid()

Review Comment:
   I'm not sure about this, maybe I'm missing something. So if meta.properties 
does not exist for some directory, we generate a random one, but it seems the 
generated value isn't persisted? That means the same log directory will get a 
different ID ever time the broker restarts, which will generate a lot of 
unnecessary reassignments...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to