soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should 
be returned or created
    * @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   *                             The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   *                             The method assumes provided Id belong to 
online directory.
    * @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
    * @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+                     topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
     logCreationOrDeletionLock synchronized {
       val log = getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
         val logDirs: List[File] = {
-          val preferredLogDir = preferredLogDirs.get(topicPartition)
+          val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+            case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+              // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+              // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+              directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence. 👍 



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should 
be returned or created
    * @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   *                             The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   *                             The method assumes provided Id belong to 
online directory.
    * @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
    * @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+                     topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
     logCreationOrDeletionLock synchronized {
       val log = getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
         val logDirs: List[File] = {
-          val preferredLogDir = preferredLogDirs.get(topicPartition)
+          val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+            case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+              // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+              // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+              directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence. 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to