soarez commented on code in PR #14881: URL: https://github.com/apache/kafka/pull/14881#discussion_r1413201855
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition, } } + private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { + targetLogDirectoryId match { + case Some(directoryId) => + if (logManager.onlineLogDirId(directoryId)) { + if (logManager.getLog(topicPartition).isEmpty) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + } + } else if ((!logManager.onlineLogDirId(directoryId) && !logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) { + // select random directory if the directory Id is LOST or unknown and no offline directories or unassigned Review Comment: Here too, directory selection isn't random, this is misleading. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2643,6 +2645,17 @@ class ReplicaManager(val config: KafkaConfig, } } + private def getAssignedDirectoryId(delta: TopicsDelta, partition: Partition): Option[Uuid] = { + for { + (topicId, topicDelta) <- delta.changedTopics().asScala.find(_._2.name() == partition.topic) + if topicDelta != null + partitionRegistration = topicDelta.partitionChanges().get(partition.partitionId) + if partitionRegistration != null + localBrokerIndexInReplicas = partitionRegistration.replicas.indexOf(localBrokerId) + directory = partitionRegistration.directories(localBrokerIndexInReplicas) + } yield directory + } + Review Comment: You can avoid all of this and just do `val directoryId = Option(newImage.topics().getPartition(topicId, partitionId).directory(brokerId))` ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -991,18 +997,27 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * A random directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. Review Comment: Not random. Following `LogManager.nextLogDirs()`. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig, } } + private def mayUpdateTopicAssignment(partition: Partition, partitionDirectoryId: Option[Uuid]) = { + if (partitionDirectoryId.isDefined) { + val topicPartitionActualDirectory = partition.log.flatMap(log => logManager.directoryId(log.dir.getParent)) + if (!topicPartitionActualDirectory.exists(partitionDirectoryId.contains)) { + topicPartitionActualDirectory + .flatMap(uuid => partition.topicId.map(topicId => + directoryEventHandler.handleAssignment(new common.TopicIdPartition(topicId, partition.partitionId), uuid) + )) Review Comment: It is odd to map and flatMap but then throw away the result. Should this be a foreach instead? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -123,14 +123,19 @@ class LogManager(logDirs: Seq[File], } private val dirLocks = lockLogDirs(liveLogDirs) - val directoryIds: Map[String, Uuid] = loadDirectoryIds(liveLogDirs) + private val directoryIds: mutable.Map[String, Uuid] = loadDirectoryIds(liveLogDirs) + def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet + @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap @volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]() + def hasOfflineLogDirs(): Boolean = offlineLogDirs.nonEmpty Review Comment: Calling the `offlineLogDirs` function builds a list by creating a set with `logDirs` and then removing all of `_liveLogDirs`. Instead of doing that we can just compare the number of entries in `logDirs` with `_liveLogDirs`. ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2741,6 +2755,18 @@ class ReplicaManager(val config: KafkaConfig, } } + private def mayUpdateTopicAssignment(partition: Partition, partitionDirectoryId: Option[Uuid]) = { Review Comment: did you mean `maybeUpdateTopicAssignment`? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition, } } + private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { + targetLogDirectoryId match { + case Some(directoryId) => + if (logManager.onlineLogDirId(directoryId)) { + if (logManager.getLog(topicPartition).isEmpty) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + } Review Comment: Don't we need an `else` branch to check if the existing log is in the indicated directory? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -867,6 +869,25 @@ class Partition(val topicPartition: TopicPartition, } } + private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { + targetLogDirectoryId match { + case Some(directoryId) => + if (logManager.onlineLogDirId(directoryId)) { + if (logManager.getLog(topicPartition).isEmpty) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + } + } else if ((!logManager.onlineLogDirId(directoryId) && !logManager.hasOfflineLogDirs()) || directoryId == DirectoryId.UNASSIGNED) { + // select random directory if the directory Id is LOST or unknown and no offline directories or unassigned + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, None) + } else { + logger.warn(s"can't create $topicPartition on directory id ${directoryId}") + } + case None => + // original flow of creating in random log directory. Review Comment: It's not random, we should remove or change this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org