soarez commented on code in PR #14578: URL: https://github.com/apache/kafka/pull/14578#discussion_r1371577705
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File], def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) /** - * Determine directory ID for each directory with a meta.properties. + * Determine directory ID for each directory. * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. + * Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned. + * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { + val s = scala.collection.mutable.Set[Uuid]() dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) - metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { - case Some(uuidStr) => Uuid.fromString(uuidStr) - case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid + val uuid = metadataCheckpoint.read() match { + case Some(props) => { + val rawMetaProperties = new RawMetaProperties(props) + val uuid_from_properties = rawMetaProperties.directoryId match { + case Some(uuidStr) => Uuid.fromString(uuidStr) + case None => + val uuid_new = Uuid.randomUuid() + rawMetaProperties.directoryId = uuid_new.toString + metadataCheckpoint.write(rawMetaProperties.props) + uuid_new + } + uuid_from_properties + } + case None => { + Uuid.randomUuid() } - dir.getAbsolutePath -> uuid - }.toMap + } + if (s.contains(uuid)) { + throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}") + } Review Comment: An alternative to maintaining a mutable set and modifying it from within the flatMap lamda could be to compare the size of the result with `.toSet.size` or `.toSeq.distinct.size` e.g. ```scala if (dirIds.values.toSet.size != dirIds.values.size) throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}") ``` ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File], def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) /** - * Determine directory ID for each directory with a meta.properties. + * Determine directory ID for each directory. * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. + * Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned. + * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { + val s = scala.collection.mutable.Set[Uuid]() dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) - metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { - case Some(uuidStr) => Uuid.fromString(uuidStr) - case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid + val uuid = metadataCheckpoint.read() match { + case Some(props) => { + val rawMetaProperties = new RawMetaProperties(props) + val uuid_from_properties = rawMetaProperties.directoryId match { + case Some(uuidStr) => Uuid.fromString(uuidStr) + case None => + val uuid_new = Uuid.randomUuid() + rawMetaProperties.directoryId = uuid_new.toString + metadataCheckpoint.write(rawMetaProperties.props) + uuid_new + } + uuid_from_properties + } + case None => { + Uuid.randomUuid() } - dir.getAbsolutePath -> uuid - }.toMap + } + if (s.contains(uuid)) { + throw new RuntimeException(s"Found duplicate directory.ids ${uuid.toString}") + } Review Comment: Perhaps `KafkaException` is more suitable than `RuntimeException`? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -270,26 +270,38 @@ class LogManager(logDirs: Seq[File], def directoryId(dir: String): Option[Uuid] = dirIds.get(dir) /** - * Determine directory ID for each directory with a meta.properties. + * Determine directory ID for each directory. * If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties. - * Directories without a meta.properties don't get a directory ID assigned. + * Directories without a meta.properties file, which can only occur in Zk mode, will file have a directory ID assigned. + * The ID will be written when the new meta.properties file is first written. */ private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = { + val s = scala.collection.mutable.Set[Uuid]() dirs.flatMap { dir => try { val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)) - metadataCheckpoint.read().map { props => - val rawMetaProperties = new RawMetaProperties(props) - val uuid = rawMetaProperties.directoryId match { - case Some(uuidStr) => Uuid.fromString(uuidStr) - case None => - val uuid = Uuid.randomUuid() - rawMetaProperties.directoryId = uuid.toString - metadataCheckpoint.write(rawMetaProperties.props) - uuid + val uuid = metadataCheckpoint.read() match { + case Some(props) => { + val rawMetaProperties = new RawMetaProperties(props) + val uuid_from_properties = rawMetaProperties.directoryId match { + case Some(uuidStr) => Uuid.fromString(uuidStr) + case None => + val uuid_new = Uuid.randomUuid() + rawMetaProperties.directoryId = uuid_new.toString + metadataCheckpoint.write(rawMetaProperties.props) + uuid_new + } + uuid_from_properties + } + case None => { + Uuid.randomUuid() Review Comment: I'm not sure about this, maybe I'm missing something. So if meta.properties does not exist for some directory, we generate a random one, but it seems the generated value isn't persisted? That means the same log directory will get a different ID ever time the broker restarts, which will generate a lot of unnecessary reassignments... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org