soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should
be returned or created
* @param topicId The topic ID of the partition's topic
+ * @param targetLogDirectoryId The directory Id that should host the the
partition's topic.
+ * The next selected directory will be picked up
if it None or equal {@link DirectoryId.UNASSIGNED}.
+ * The method assumes provided Id belong to
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not
match the topic ID provided
*/
- def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false,
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+ def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false,
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]
= Option.empty): UnifiedLog = {
logCreationOrDeletionLock synchronized {
val log = getLog(topicPartition, isFuture).getOrElse {
// create the log if it has not already been created in another thread
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are
offline")
val logDirs: List[File] = {
- val preferredLogDir = preferredLogDirs.get(topicPartition)
+ val preferredLogDir = targetLogDirectoryId.filterNot(_ ==
DirectoryId.UNASSIGNED) match {
+ case Some(targetId) if
!preferredLogDirs.containsKey(topicPartition) =>
+ // If partition is configured with both targetLogDirectoryId and
preferredLogDirs, then
+ // preferredLogDirs will be respected, otherwise
targetLogDirectoryId will be respected
+ directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)
Review Comment:
This makes sense. The preferredLogDir is set by an admin, and if that exists
it should take precedence. 👍
##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should
be returned or created
* @param topicId The topic ID of the partition's topic
+ * @param targetLogDirectoryId The directory Id that should host the the
partition's topic.
+ * The next selected directory will be picked up
if it None or equal {@link DirectoryId.UNASSIGNED}.
+ * The method assumes provided Id belong to
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not
match the topic ID provided
*/
- def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false,
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+ def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false,
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]
= Option.empty): UnifiedLog = {
logCreationOrDeletionLock synchronized {
val log = getLog(topicPartition, isFuture).getOrElse {
// create the log if it has not already been created in another thread
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are
offline")
val logDirs: List[File] = {
- val preferredLogDir = preferredLogDirs.get(topicPartition)
+ val preferredLogDir = targetLogDirectoryId.filterNot(_ ==
DirectoryId.UNASSIGNED) match {
+ case Some(targetId) if
!preferredLogDirs.containsKey(topicPartition) =>
+ // If partition is configured with both targetLogDirectoryId and
preferredLogDirs, then
+ // preferredLogDirs will be respected, otherwise
targetLogDirectoryId will be respected
+ directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)
Review Comment:
This makes sense. The preferredLogDir is set by an admin, and if that exists
it should take precedence. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]