soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1413201855


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+    targetLogDirectoryId match {
+      case Some(directoryId) =>
+        if (logManager.onlineLogDirId(directoryId)) {
+          if (logManager.getLog(topicPartition).isEmpty) {
+            createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+          }
+        } else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+          // select random directory if the directory Id is LOST or unknown 
and no offline directories or unassigned

Review Comment:
   Here too, directory selection isn't random, this is misleading.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2643,6 +2645,17 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def getAssignedDirectoryId(delta: TopicsDelta, partition: 
Partition): Option[Uuid] = {
+    for {
+      (topicId, topicDelta) <- delta.changedTopics().asScala.find(_._2.name() 
== partition.topic)
+      if topicDelta != null
+      partitionRegistration = 
topicDelta.partitionChanges().get(partition.partitionId)
+      if partitionRegistration != null
+      localBrokerIndexInReplicas = 
partitionRegistration.replicas.indexOf(localBrokerId)
+      directory = partitionRegistration.directories(localBrokerIndexInReplicas)
+    } yield directory
+  }
+

Review Comment:
   You can avoid all of this and just do  `val directoryId = 
Option(newImage.topics().getPartition(topicId, 
partitionId).directory(brokerId))`



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -991,18 +997,27 @@ class LogManager(logDirs: Seq[File],
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should 
be returned or created
    * @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   *                             A random directory will be picked up if it 
None or equal {@link DirectoryId.UNASSIGNED}.

Review Comment:
   Not random. Following `LogManager.nextLogDirs()`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 
partitionDirectoryId: Option[Uuid]) = {
+    if (partitionDirectoryId.isDefined) {
+      val topicPartitionActualDirectory = partition.log.flatMap(log => 
logManager.directoryId(log.dir.getParent))
+      if 
(!topicPartitionActualDirectory.exists(partitionDirectoryId.contains)) {
+        topicPartitionActualDirectory
+          .flatMap(uuid => partition.topicId.map(topicId =>
+            directoryEventHandler.handleAssignment(new 
common.TopicIdPartition(topicId, partition.partitionId), uuid)
+          ))

Review Comment:
   It is odd to map and flatMap but then throw away the result. Should this be 
a foreach instead?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -123,14 +123,19 @@ class LogManager(logDirs: Seq[File],
   }
 
   private val dirLocks = lockLogDirs(liveLogDirs)
-  val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
+  private val directoryIds: mutable.Map[String, Uuid] = 
loadDirectoryIds(liveLogDirs)
+  def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
+
   @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), 
logDirFailureChannel))).toMap
   @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
     (dir, new OffsetCheckpointFile(new File(dir, 
LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
 
   private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, 
String]()
 
+  def hasOfflineLogDirs(): Boolean = offlineLogDirs.nonEmpty

Review Comment:
   Calling the `offlineLogDirs` function builds a list by creating a set with 
`logDirs` and then removing all of `_liveLogDirs`. Instead of doing that we can 
just compare the number of entries in `logDirs` with `_liveLogDirs`.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  private def mayUpdateTopicAssignment(partition: Partition, 
partitionDirectoryId: Option[Uuid]) = {

Review Comment:
   did you mean `maybeUpdateTopicAssignment`?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+    targetLogDirectoryId match {
+      case Some(directoryId) =>
+        if (logManager.onlineLogDirId(directoryId)) {
+          if (logManager.getLog(topicPartition).isEmpty) {
+            createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+          }

Review Comment:
   Don't we need an `else` branch to check if the existing log is in the 
indicated directory?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
+  private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
+    targetLogDirectoryId match {
+      case Some(directoryId) =>
+        if (logManager.onlineLogDirId(directoryId)) {
+          if (logManager.getLog(topicPartition).isEmpty) {
+            createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+          }
+        } else if ((!logManager.onlineLogDirId(directoryId) && 
!logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) {
+          // select random directory if the directory Id is LOST or unknown 
and no offline directories or unassigned
+          createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, None)
+        } else {
+         logger.warn(s"can't create $topicPartition on directory id 
${directoryId}")
+        }
+      case None =>
+        // original flow of creating in random log directory.

Review Comment:
   It's not random, we should remove or change this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to