Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
ijuma commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1489643087 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: What is the operator supposed to do when they see this warning? Generally, we should be very careful about warning logs - most times they are an anti-pattern as they are scary but not actionable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon merged PR #15263: URL: https://github.com/apache/kafka/pull/15263 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on PR #15263: URL: https://github.com/apache/kafka/pull/15263#issuecomment-1925168574 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gaurav-narula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1473758133 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4912,7 +4919,9 @@ class ReplicaManagerTest { assertTrue(fooPart eq fooPart2) val bar1 = new TopicPartition("bar", 1) replicaManager.markPartitionOffline(bar1) - assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) + val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID).get + assertTrue(barNew) + assertEquals(bar1, barPart.topicPartition) Review Comment: Addressed in [cdf9c0f](https://github.com/apache/kafka/pull/15263/commits/cdf9c0f0cf817e68ca27092addb21308a9bc3762) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1473746536 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4912,7 +4919,9 @@ class ReplicaManagerTest { assertTrue(fooPart eq fooPart2) val bar1 = new TopicPartition("bar", 1) replicaManager.markPartitionOffline(bar1) - assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) + val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID).get + assertTrue(barNew) + assertEquals(bar1, barPart.topicPartition) Review Comment: Could we add a test case for returning `None`? Since after this PR, we'll have 2 cases when creating partition in OfflinePartition, we should test them both. ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -3805,4 +3805,281 @@ class PartitionTest extends AbstractPartitionTest { when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker))) } } + + @ParameterizedTest + @ValueSource(booleans = Array(false, true)) + def makeLeaderInvokesgetOrCreateLog_OnOnlineLogDir(isNew: Boolean): Unit = { Review Comment: Thanks for adding the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gnarula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1473115063 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig, delta: TopicsDelta, topicId: Uuid): Option[(Partition, Boolean)] = { getPartition(tp) match { - case HostedPartition.Offline => -stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + - s"with topic id $topicId because it resides in an offline log " + - "directory.") -None + case HostedPartition.Offline(offlinePartition) => +if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) { + stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + +s"with topic id $topicId because it resides in an offline log " + +"directory.") + None +} else { + stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + +s"A topic with the same name but different id exists but it resides in an offline log " + +s"directory.") Review Comment: Created https://issues.apache.org/jira/browse/KAFKA-16212 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gnarula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1472753854 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Addressed in [9678cc9](https://github.com/apache/kafka/pull/15263/commits/9678cc9c19b322232a78e2623bbada6b51e55baa) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1471733042 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: I don't believe it will cause an issue. We just need to ensure we have good test coverage for this at least in unit tests as JBOD doesn't have strong integration tests at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1471075201 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig, delta: TopicsDelta, topicId: Uuid): Option[(Partition, Boolean)] = { getPartition(tp) match { - case HostedPartition.Offline => -stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + - s"with topic id $topicId because it resides in an offline log " + - "directory.") -None + case HostedPartition.Offline(offlinePartition) => +if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) { + stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + +s"with topic id $topicId because it resides in an offline log " + +"directory.") + None +} else { + stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + +s"A topic with the same name but different id exists but it resides in an offline log " + +s"directory.") Review Comment: What would be the impact if we don't fix it? Since currently, we're assuming the topicPartition with another topic ID is located on a dir with No state. So I'm thinking the impact would be that: 1. if the partition is on another dir which already has the Online state, we will create a duplicated one, or the state will be wrong. 2. if the partition is on another dir which is offline, we'll try to create it, but got exception without good handler (I think). 3. Anything else? Given this case is pretty rare that only happen when the topic name is the same, I agree we can create another JIRA for that improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gnarula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470869032 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig, delta: TopicsDelta, topicId: Uuid): Option[(Partition, Boolean)] = { getPartition(tp) match { - case HostedPartition.Offline => -stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + - s"with topic id $topicId because it resides in an offline log " + - "directory.") -None + case HostedPartition.Offline(offlinePartition) => +if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) { + stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + +s"with topic id $topicId because it resides in an offline log " + +"directory.") + None +} else { + stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + +s"A topic with the same name but different id exists but it resides in an offline log " + +s"directory.") Review Comment: I think the assumption is `allPartitions` only contains the TopicPartition key corresponding to the "latest" topic-id in case of a conflict. I agree, keying it by `TopicIdPartition` would avoid ambiguity in the long run. Perhaps we should take that up in a future JIRA? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470669307 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig, delta: TopicsDelta, topicId: Uuid): Option[(Partition, Boolean)] = { getPartition(tp) match { - case HostedPartition.Offline => -stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + - s"with topic id $topicId because it resides in an offline log " + - "directory.") -None + case HostedPartition.Offline(offlinePartition) => +if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) { + stateChangeLogger.warn(s"Unable to bring up new local leader $tp " + +s"with topic id $topicId because it resides in an offline log " + +"directory.") + None +} else { + stateChangeLogger.info(s"Creating new partition $tp with topic id " + s"$topicId." + +s"A topic with the same name but different id exists but it resides in an offline log " + +s"directory.") Review Comment: Question: From this check `if (offlinePartition.flatMap(p => p.topicId).contains(topicId))`, we can make sure this partition is not in an offline dir, but how could we know if the partition is an Online dir, or an None dir, or even in another offline dir? Should we use topicIDPartition as the key for `allPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470557475 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Yes, I agree with @OmniaGM that we don't need to branch if/else here since we already do the handling in `createLogIfNotExists`. So we only need to pass in `partitionState.isNew` correctly here. Does that make sense? ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: Also, before this change, we'll always treat it as `new` if `directoryId == DirectoryId.UNASSIGNED`. But now, it needs to be `directoryId == DirectoryId.UNASSIGNED && partitionState.isNew`. Will that cause other issue @OmniaGM ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1470001208 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final case class Offline(topicId: Option[Uuid]) extends HostedPartition + final case class Offline(partition: Option[Partition]) extends HostedPartition Review Comment: Actually ignore me. Partition wouldn't be available for `Offline` class all the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469993396 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final case class Offline(topicId: Option[Uuid]) extends HostedPartition + final case class Offline(partition: Option[Partition]) extends HostedPartition Review Comment: if you are using `Partition` then maybe remove the Option as `Partition.topicId` already return option -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gnarula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469986094 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final object Offline extends HostedPartition + final case class Offline(topicId: Option[Uuid]) extends HostedPartition Review Comment: Addressed in [99dbbaa](https://github.com/apache/kafka/pull/15263/commits/99dbbaa9dbb90affbc56832b9ea2e546c27cb4d1). Used the `Partition` class for consistency. It has a method to retrieve topcicId. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469696139 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final object Offline extends HostedPartition + final case class Offline(topicId: Option[Uuid]) extends HostedPartition Review Comment: Specially that the signature for `Online` is `final case class Online(partition: Partition)` so just to keep it some how consistent We can have `Offline(topicIdPartition: Option[TopicIdPartition]` _Note_: this is just a suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469691076 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -183,7 +183,7 @@ object HostedPartition { /** * This broker hosts the partition, but it is in an offline log directory. */ - final object Offline extends HostedPartition + final case class Offline(topicId: Option[Uuid]) extends HostedPartition Review Comment: Should this instead be `TopicIdPartition` as `Offline` here represent a partition and not a topic per-say?! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469680659 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: > Is it because there are potentially offline log dir might already have the logs? If so, does that mean we can't create any new topic while 1 log dir goes offline? My understanding of the original logic before JBOD is that we block creating topic partition on broker if `partitionState.isNew` is false and the broker has any offline log directories. This logic is in `LogManager.getOrCreateLog` which will throw `KafkaStorageException`. Am not sure we need to throw it here in `Partition.createLogInAssignedDirectoryId` if it will be thrown anyway in `LogManager.getOrCreateLog` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
OmniaGM commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1469680659 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: > Is it because there are potentially offline log dir might already have the logs? If so, does that mean we can't create any new topic while 1 log dir goes offline? My understanding of the original logic before JBOD is that we block creating topic partition on broker if `partitionState.isNew` is false and the broker has any offline log directories. This logic is in `LogManager.getOrCreateLog` which will throw `KafkaStorageException`. Am not sure if we need to throw it here in `Partition.createLogInAssignedDirectoryId` if it will be thrown anyway in `LogManager.getOrCreateLog` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gaurav-narula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1467590251 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: IIUC, it's from [KIP-112](https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD) > The broker will specify error=KafkaStorageException for those partitions that are in the LeaderAndIsrRequest with isNewReplica=False but not found on any good log directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
showuon commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1467584667 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: TBH, I don't understand why we should throw exception [here](https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/core/src/main/scala/kafka/log/LogManager.scala#L1009). Is it because there are potentially offline log dir might already have the logs? If so, does that mean we can't create any new topic while 1 log dir goes offline? Sorry, I'm not quite familiar with this part of logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gnarula commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1466580289 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: I'm not quite sure if we should also be throwing an exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gaurav-narula commented on PR #15263: URL: https://github.com/apache/kafka/pull/15263#issuecomment-1910373586 CC: @OmniaGM @cmccabe @pprovenzano @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
gaurav-narula opened a new pull request, #15263: URL: https://github.com/apache/kafka/pull/15263 In Kraft mode, the broker fails to handle topic recreation correctly with broken disks. This is because `ReplicaManager` tracks HostedPartitions which are on an offline disk but it doesn't associate TopicId information with them. This change updates `HostedPartition.Offline` to associate topic id information. We also update the log creation logic in `Partition::createLogInAssignedDirectoryId` to not just rely on `targetLogDirectoryId == DirectoryId.UNASSIGNED` to determine if the log to be created is "new". Please refer to the comments in https://issues.apache.org/jira/browse/KAFKA-16157 for more information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org