Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1489643087


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   What is the operator supposed to do when they see this warning? Generally, 
we should be very careful about warning logs - most times they are an 
anti-pattern as they are scary but not actionable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-02-02 Thread via GitHub


showuon merged PR #15263:
URL: https://github.com/apache/kafka/pull/15263


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-02-02 Thread via GitHub


showuon commented on PR #15263:
URL: https://github.com/apache/kafka/pull/15263#issuecomment-1925168574

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


gaurav-narula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1473758133


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4912,7 +4919,9 @@ class ReplicaManagerTest {
   assertTrue(fooPart eq fooPart2)
   val bar1 = new TopicPartition("bar", 1)
   replicaManager.markPartitionOffline(bar1)
-  assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, 
BAR_UUID))
+  val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, 
emptyDelta, BAR_UUID).get
+  assertTrue(barNew)
+  assertEquals(bar1, barPart.topicPartition)

Review Comment:
   Addressed in 
[cdf9c0f](https://github.com/apache/kafka/pull/15263/commits/cdf9c0f0cf817e68ca27092addb21308a9bc3762)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1473746536


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4912,7 +4919,9 @@ class ReplicaManagerTest {
   assertTrue(fooPart eq fooPart2)
   val bar1 = new TopicPartition("bar", 1)
   replicaManager.markPartitionOffline(bar1)
-  assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, 
BAR_UUID))
+  val (barPart, barNew) = replicaManager.getOrCreatePartition(bar1, 
emptyDelta, BAR_UUID).get
+  assertTrue(barNew)
+  assertEquals(bar1, barPart.topicPartition)

Review Comment:
   Could we add a test case for returning `None`?
   Since after this PR, we'll have 2 cases when creating partition in 
OfflinePartition, we should test them both.



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -3805,4 +3805,281 @@ class PartitionTest extends AbstractPartitionTest {
   
when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker)))
 }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def makeLeaderInvokesgetOrCreateLog_OnOnlineLogDir(isNew: Boolean): Unit = {

Review Comment:
   Thanks for adding the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


gnarula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1473115063


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig,
   delta: TopicsDelta,
   topicId: Uuid): Option[(Partition, 
Boolean)] = {
 getPartition(tp) match {
-  case HostedPartition.Offline =>
-stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
-  s"with topic id $topicId because it resides in an offline log " +
-  "directory.")
-None
+  case HostedPartition.Offline(offlinePartition) =>
+if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+  stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+s"with topic id $topicId because it resides in an offline log " +
+"directory.")
+  None
+} else {
+  stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
+s"A topic with the same name but different id exists but it 
resides in an offline log " +
+s"directory.")

Review Comment:
   Created https://issues.apache.org/jira/browse/KAFKA-16212



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-31 Thread via GitHub


gnarula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1472753854


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   Addressed in 
[9678cc9](https://github.com/apache/kafka/pull/15263/commits/9678cc9c19b322232a78e2623bbada6b51e55baa)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-30 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1471733042


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   I don't believe it will cause an issue. We just need to ensure we have good 
test coverage for this at least in unit tests as JBOD doesn't have strong 
integration tests at the moment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-30 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1471075201


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig,
   delta: TopicsDelta,
   topicId: Uuid): Option[(Partition, 
Boolean)] = {
 getPartition(tp) match {
-  case HostedPartition.Offline =>
-stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
-  s"with topic id $topicId because it resides in an offline log " +
-  "directory.")
-None
+  case HostedPartition.Offline(offlinePartition) =>
+if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+  stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+s"with topic id $topicId because it resides in an offline log " +
+"directory.")
+  None
+} else {
+  stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
+s"A topic with the same name but different id exists but it 
resides in an offline log " +
+s"directory.")

Review Comment:
   What would be the impact if we don't fix it? 
   Since currently, we're assuming the topicPartition with another topic ID is 
located on a dir with No state. So I'm thinking the impact would be that:
   1. if the partition is on another dir which already has the Online state, we 
will create a duplicated one, or the state will be wrong.
   2. if the partition is on another dir which is offline, we'll try to create 
it, but got exception without good handler (I think). 
   3. Anything else?
   
   Given this case is pretty rare that only happen when the topic name is the 
same, I agree we can create another JIRA for that improvement. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-30 Thread via GitHub


gnarula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470869032


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig,
   delta: TopicsDelta,
   topicId: Uuid): Option[(Partition, 
Boolean)] = {
 getPartition(tp) match {
-  case HostedPartition.Offline =>
-stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
-  s"with topic id $topicId because it resides in an offline log " +
-  "directory.")
-None
+  case HostedPartition.Offline(offlinePartition) =>
+if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+  stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+s"with topic id $topicId because it resides in an offline log " +
+"directory.")
+  None
+} else {
+  stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
+s"A topic with the same name but different id exists but it 
resides in an offline log " +
+s"directory.")

Review Comment:
   I think the assumption is `allPartitions` only contains the TopicPartition 
key corresponding to the "latest" topic-id in case of a conflict.
   
   I agree, keying it by `TopicIdPartition` would avoid ambiguity in the long 
run. Perhaps we should take that up in a future JIRA?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470669307


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2759,11 +2761,20 @@ class ReplicaManager(val config: KafkaConfig,
   delta: TopicsDelta,
   topicId: Uuid): Option[(Partition, 
Boolean)] = {
 getPartition(tp) match {
-  case HostedPartition.Offline =>
-stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
-  s"with topic id $topicId because it resides in an offline log " +
-  "directory.")
-None
+  case HostedPartition.Offline(offlinePartition) =>
+if (offlinePartition.flatMap(p => p.topicId).contains(topicId)) {
+  stateChangeLogger.warn(s"Unable to bring up new local leader $tp " +
+s"with topic id $topicId because it resides in an offline log " +
+"directory.")
+  None
+} else {
+  stateChangeLogger.info(s"Creating new partition $tp with topic id " 
+ s"$topicId." +
+s"A topic with the same name but different id exists but it 
resides in an offline log " +
+s"directory.")

Review Comment:
   Question: From this check `if (offlinePartition.flatMap(p => 
p.topicId).contains(topicId))`, we can make sure this partition is not in an 
offline dir, but how could we know if the partition is an Online dir, or an 
None dir, or even in another offline dir? Should we use topicIDPartition as the 
key for `allPartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470557475


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   Yes, I agree with @OmniaGM  that we don't need to branch if/else here since 
we already do the handling in `createLogIfNotExists`. So we only need to pass 
in `partitionState.isNew` correctly here. Does that make sense?



##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   Also, before this change, we'll always treat it as `new` if `directoryId == 
DirectoryId.UNASSIGNED`. But now, it needs to be `directoryId == 
DirectoryId.UNASSIGNED && partitionState.isNew`. Will that cause other issue 
@OmniaGM ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1470001208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final case class Offline(topicId: Option[Uuid]) extends HostedPartition
+  final case class Offline(partition: Option[Partition]) extends 
HostedPartition

Review Comment:
   Actually ignore me. Partition wouldn't be available for `Offline` class all 
the time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469993396


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final case class Offline(topicId: Option[Uuid]) extends HostedPartition
+  final case class Offline(partition: Option[Partition]) extends 
HostedPartition

Review Comment:
   if you are using `Partition` then maybe remove the Option as 
`Partition.topicId` already return option 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


gnarula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469986094


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final object Offline extends HostedPartition
+  final case class Offline(topicId: Option[Uuid]) extends HostedPartition

Review Comment:
   Addressed in 
[99dbbaa](https://github.com/apache/kafka/pull/15263/commits/99dbbaa9dbb90affbc56832b9ea2e546c27cb4d1).
 Used the `Partition` class for consistency. It has a method to retrieve 
topcicId.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469696139


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final object Offline extends HostedPartition
+  final case class Offline(topicId: Option[Uuid]) extends HostedPartition

Review Comment:
   Specially that the signature for `Online` is `final case class 
Online(partition: Partition)` so just to keep it some how consistent We can 
have `Offline(topicIdPartition: Option[TopicIdPartition]`
   
   _Note_: this is just a suggestion! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469691076


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -183,7 +183,7 @@ object HostedPartition {
   /**
* This broker hosts the partition, but it is in an offline log directory.
*/
-  final object Offline extends HostedPartition
+  final case class Offline(topicId: Option[Uuid]) extends HostedPartition

Review Comment:
   Should this instead be `TopicIdPartition` as `Offline` here represent a 
partition and not a topic per-say?!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469680659


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   > Is it because there are potentially offline log dir might already have the 
logs? If so, does that mean we can't create any new topic while 1 log dir goes 
offline? 
   
   My understanding of the original logic before JBOD is that we block creating 
topic partition on broker if `partitionState.isNew` is false and the broker has 
any offline log directories. This logic is in `LogManager.getOrCreateLog` which 
will throw `KafkaStorageException`. Am not sure we need to throw it here in 
`Partition.createLogInAssignedDirectoryId` if it will be thrown anyway in  
`LogManager.getOrCreateLog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1469680659


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   > Is it because there are potentially offline log dir might already have the 
logs? If so, does that mean we can't create any new topic while 1 log dir goes 
offline? 
   
   My understanding of the original logic before JBOD is that we block creating 
topic partition on broker if `partitionState.isNew` is false and the broker has 
any offline log directories. This logic is in `LogManager.getOrCreateLog` which 
will throw `KafkaStorageException`. Am not sure if we need to throw it here in 
`Partition.createLogInAssignedDirectoryId` if it will be thrown anyway in  
`LogManager.getOrCreateLog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-26 Thread via GitHub


gaurav-narula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1467590251


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   IIUC, it's from 
[KIP-112](https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD)
   
   > The broker will specify error=KafkaStorageException for those partitions 
that are in the LeaderAndIsrRequest with isNewReplica=False but not found on 
any good log directory. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-26 Thread via GitHub


showuon commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1467584667


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   TBH, I don't understand why we should throw exception 
[here](https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/core/src/main/scala/kafka/log/LogManager.scala#L1009).
 Is it because there are potentially offline log dir might already have the 
logs? If so, does that mean we can't create any new topic while 1 log dir goes 
offline? Sorry, I'm not quite familiar with this part of logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-25 Thread via GitHub


gnarula commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1466580289


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   I'm not quite sure if we should also be throwing an exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-25 Thread via GitHub


gaurav-narula commented on PR #15263:
URL: https://github.com/apache/kafka/pull/15263#issuecomment-1910373586

   CC: @OmniaGM @cmccabe @pprovenzano  @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-01-25 Thread via GitHub


gaurav-narula opened a new pull request, #15263:
URL: https://github.com/apache/kafka/pull/15263

   In Kraft mode, the broker fails to handle topic recreation correctly with 
broken disks. This is because `ReplicaManager` tracks HostedPartitions which 
are on an offline disk but it doesn't associate TopicId information with them.
   
   This change updates `HostedPartition.Offline` to associate topic id 
information. We also update the log creation logic in 
`Partition::createLogInAssignedDirectoryId` to not just rely on 
`targetLogDirectoryId == DirectoryId.UNASSIGNED` to determine if the log to be 
created is "new".
   
   Please refer to the comments in 
https://issues.apache.org/jira/browse/KAFKA-16157 for more information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org