showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470557475
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => - createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + } else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Yes, I agree with @OmniaGM that we don't need to branch if/else here since we already do the handling in `createLogIfNotExists`. So we only need to pass in `partitionState.isNew` correctly here. Does that make sense? ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => - createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) + } else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Also, before this change, we'll always treat it as `new` if `directoryId == DirectoryId.UNASSIGNED`. But now, it needs to be `directoryId == DirectoryId.UNASSIGNED && partitionState.isNew`. Will that cause other issue @OmniaGM ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org