soarez commented on code in PR #14881: URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550
########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * The next selected directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. + * The method assumes provided Id belong to online directory. * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided */ - def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = { + def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = Option.empty): UnifiedLog = { logCreationOrDeletionLock synchronized { val log = getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { - val preferredLogDir = preferredLogDirs.get(topicPartition) + val preferredLogDir = targetLogDirectoryId.filterNot(_ == DirectoryId.UNASSIGNED) match { + case Some(targetId) if !preferredLogDirs.containsKey(topicPartition) => + // If partition is configured with both targetLogDirectoryId and preferredLogDirs, then + // preferredLogDirs will be respected, otherwise targetLogDirectoryId will be respected + directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null) Review Comment: This makes sense. The preferredLogDir is set by an admin, and if that exists it should take precedence. 👍 ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * The next selected directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. + * The method assumes provided Id belong to online directory. * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided */ - def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = { + def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = Option.empty): UnifiedLog = { logCreationOrDeletionLock synchronized { val log = getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { - val preferredLogDir = preferredLogDirs.get(topicPartition) + val preferredLogDir = targetLogDirectoryId.filterNot(_ == DirectoryId.UNASSIGNED) match { + case Some(targetId) if !preferredLogDirs.containsKey(topicPartition) => + // If partition is configured with both targetLogDirectoryId and preferredLogDirs, then + // preferredLogDirs will be respected, otherwise targetLogDirectoryId will be respected + directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null) Review Comment: This makes sense. The preferredLogDir is set by an admin, and if that exists it should take precedence. 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org