Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon merged PR #15557:
URL: https://github.com/apache/kafka/pull/15557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1559170927


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  if (this.promotionStates.containsKey(topicPartition)) {
+val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+// Revert any reassignments for partitions that did not complete the 
future replica promotion
+if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+}
+this.promotionStates.remove(topicPartition)
+  }

Review Comment:
   Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2047083295

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-09 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1557584496


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  if (this.promotionStates.containsKey(topicPartition)) {
+val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+// Revert any reassignments for partitions that did not complete the 
future replica promotion
+if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+}
+this.promotionStates.remove(topicPartition)
+  }

Review Comment:
   Great question.
   
   > in what circumstance, the promotionStates will not contain the topic 
partition?
   
   `promotionStates` will not contain the partition if it's either removed 
twice, or removed before without being added before. The 
`addPartitions/removePartitions` semantics in `ReplicaAlterLogDirsThread` seem 
to have been intended to tolerate this behavior. Have a look at 
`handleLogDirFailure` in `ReplicaManager`, when a directory fails, all 
partitions in the failed directory end up being passed to `removePartitions` 
(via `removeFetcherForPartitions`), regardless of whether they were ever added. 
So `removePartitions` needs to be idempotent, and tolerate unknown partitions.
   
   > before this change, we'll send out AssignReplicasToDirsRequest no matter 
what, but now, we will skip when no PromotionState
   
   No, I think the behavior here is the same before this change, in which 
`partitionState <- partitionAssignmentRequestState(topicPartition)` runs inside 
the Scala `for` comprehension, and if 
`partitionAssignmentRequestState(topicPartition)` results in `None` then the 
execution stops. That's equivalent to an `if` gate on 
`this.promotionStates.containsKey(topicPartition)`.
   
   > Should we still invoke directoryEventHandler.handleAssignment even if no 
PromotionState? I think no, but I'd like to hear your thought here.
   
   I agree we should not invoke `directoryEventHandler.handleAssignment` if 
there is no  `PromotionState`, because it's only created if `addPartitions` is 
called for the partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-08 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1555305760


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  if (this.promotionStates.containsKey(topicPartition)) {
+val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionStates.get(topicPartition)
+// Revert any reassignments for partitions that did not complete the 
future replica promotion
+if (originalDir.isDefined && topicId.isDefined && 
reassignmentState.maybeInconsistentMetadata) {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () 
=> ())
+}
+this.promotionStates.remove(topicPartition)
+  }

Review Comment:
   In your opinion, if what circumstance, the `promotionStates` will not 
contain the topic partition? Maybe the `removePartitions` got called multiple 
times? I'm thinking, because before this change, we'll send out 
`AssignReplicasToDirsRequest` no matter what, but now, we will skip when no 
`PromotionState`, will that cause any potential problem? Maybe when upgrading? 
(Is it possible?) Should we still invoke 
`directoryEventHandler.handleAssignment` even if no `PromotionState`? I think 
no, but I'd like to hear your thought here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-07 Thread via GitHub


soarez commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2041535672

   I think I've addressed the issues. The failing tests on the most recent 
build are unrelated. PTAL @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-02 Thread via GitHub


showuon commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2033422766

   There are quite many failed tests because of this change. Could you take a 
look?
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15557/4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on PR #15557:
URL: https://github.com/apache/kafka/pull/15557#issuecomment-2030993093

   There's a compilation error in jdk8_scala2.12:
   
   ```
   [2024-04-01T13:50:09.625Z] [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15557@2/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:119:9:
 ambiguous reference to overloaded definition,
   [2024-04-01T13:50:09.625Z] both method debug in trait Logger of type (x$1: 
String, x$2: Object*)Unit
   [2024-04-01T13:50:09.625Z] and  method debug in trait Logger of type (x$1: 
String, x$2: Any, x$3: Any)Unit
   [2024-04-01T13:50:09.625Z] match argument types 
(String,org.apache.kafka.common.TopicPartition,kafka.server.ReplicaAlterLogDirsThread.ReassignmentState)
   ```
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15557/3/pipeline
   
   Please help fix it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1547006909


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {

Review Comment:
   Sounds good to me. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546282044


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {

Review Comment:
   Yes, I think so. I'll add a debug log to `updateReassignmentState`, so we 
can log every state transition. In that situation, if we don't see the 
respective transition change that would indicate the future log has not caught 
up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546280257


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())

Review Comment:
   Good idea. I'll test that in `ReplicationControlManagerTest` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546269825


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())

Review Comment:
   For (3), had a quick look, indeed it's not an easy test. 
   Could we, instead, test the logic in `ReplicationControlManager`? That is, 
add a test in `ReplicationControlManagerTest`, to make sure after receiving a 
reassignment request ->  log failure request -> reverted reassignment request, 
then, it should be able to elect another leader... etc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546251867


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {

Review Comment:
   What I'm afraid is the future log, somehow, it never get promoted because it 
cannot catch up with the local log, users might want to have clue from the log 
output. WDYT? If you think it's too noisy, we can leave it as is, and add it in 
the future if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546237766


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {
-  partition.futureReplicaDirectoryId()
-.map(id => {
-  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), id,
-() => 
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED))
-  // mark the assignment request state as queued.
-  
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED)
-})
+  val targetDir = partition.futureReplicaDirectoryId().get
+  val topicIdPartition = new TopicIdPartition(topicId.get, 
topicPartition.partition())
+  directoryEventHandler.handleAssignment(topicIdPartition, targetDir, 
() => updateReassignmentState(topicPartition, ReassignmentState.Accepted))
+  updateReassignmentState(topicPartition, ReassignmentState.Queued)
 })
-  case Some(ReplicaAlterLogDirsThread.COMPLETED) =>
+  case ReassignmentState.Accepted =>
 // Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
 if (partition.maybeReplaceCurrentWithFutureReplica()) {

Review Comment:
   Same as with L127, let's continue in the other thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546237017


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {

Review Comment:
   I think that would be a very noisy log, this method is called for every 
fetched batch. I'm not sure how it would be useful. What do you have in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


soarez commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546234986


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())

Review Comment:
   Good questions.
   
   1. Yes. See `ReplicationController.handleAssignReplicasToDirs`. If a replica 
is assigned to a directory that is not an online directory in the broker, 
leadership and ISR is updated.
   2. Yes, I'll update the KIP. At the time, I thought it made more sense to 
use `UUID.LOST_DIR`, but in practice it's a more general solution to just 
revert the assignment.
   3. I've started but haven't yet been able to come up with an integration 
test for this. It requires blocking the process of replica fetching and 
assginment processing and injecting failures at very specific times. Let me 
know if you think it's critical to have an integration test for this, and I'll 
give it another go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546153410


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () 
=> ())
+for (topicPartition <- topicPartitions) {
+  // Revert any reassignments for partitions that did not complete the 
future replica promotion
+  val PromotionState(reassignmentState, topicId, originalDir) = 
this.promotionState(topicPartition)
+  if (reassignmentState.inconsistentMetadata) {
+directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, 
topicPartition.partition()), originalDir, () => ())
+  }
+
+  this.promotionStates.remove(topicPartition)
+}
 
 super.removePartitions(topicPartitions)
   }
 
+  private def promotionState(topicPartition: TopicPartition): PromotionState = 
promotionStates.get(topicPartition)
+
+  private def reassignmentState(topicPartition: TopicPartition): 
ReassignmentState = promotionState(topicPartition).reassignmentState
+
   // Visible for testing
-  private[server] def updatedAssignmentRequestState(topicPartition: 
TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): 
Unit = {
-assignmentRequestStates.put(topicPartition, state)
+  private[server] def updateReassignmentState(topicPartition: TopicPartition, 
state: ReassignmentState): Unit = {
+promotionStates.put(topicPartition, 
promotionState(topicPartition).withAssignment(state))
   }
 
   private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
 val topicId = partition.topicId
 if (topicId.isEmpty)
   throw new IllegalStateException(s"Topic ${topicPartition.topic()} does 
not have an ID.")
 
-partitionAssignmentRequestState(topicPartition) match {
-  case None =>
+reassignmentState(topicPartition) match {
+  case ReassignmentState.None =>
 // Schedule assignment request and don't promote the future replica 
yet until the controller has accepted the request.
 partition.runCallbackIfFutureReplicaCaughtUp(_ => {
-  partition.futureReplicaDirectoryId()
-.map(id => {
-  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), id,
-() => 
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED))
-  // mark the assignment request state as queued.
-  
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED)
-})
+  val targetDir = partition.futureReplicaDirectoryId().get
+  val topicIdPartition = new TopicIdPartition(topicId.get, 
topicPartition.partition())
+  directoryEventHandler.handleAssignment(topicIdPartition, targetDir, 
() => updateReassignmentState(topicPartition, ReassignmentState.Accepted))
+  updateReassignmentState(topicPartition, ReassignmentState.Queued)
 })
-  case Some(ReplicaAlterLogDirsThread.COMPLETED) =>
+  case ReassignmentState.Accepted =>
 // Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
 if (partition.maybeReplaceCurrentWithFutureReplica()) {

Review Comment:
   Should we log something in debug level or trace level, if 
`maybeReplaceCurrentWithFutureReplica` return false, because of not caught-up?



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
-// Schedule assignment request to revert any queued request before 
cancelling
-for {
-  topicPartition <- topicPartitions
-  partitionState <- partitionAssignmentRequestState(topicPartition)
-  if partitionState == QUEUED
-  partition = replicaMgr.getPartitionOrException(topicPartition)
-  topicId <- partition.topicId
-  directoryId <- partition.logDirectoryId()
-  topicIdPartition = new TopicIdPartition(topicId, 
topicPartition.partition())
-} 

Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]

2024-04-01 Thread via GitHub


showuon commented on code in PR #15557:
URL: https://github.com/apache/kafka/pull/15557#discussion_r1546078129


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId   The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDirThe original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+  PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+/**
+ * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+ * directory where the main replica is hosted.
+ */
+def inconsistentMetadata: Boolean = false

Review Comment:
   If it's "maybe" inconsistent, should we make it clear in the variable name?



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId   The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDirThe original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+  PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+/**
+ * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+ * directory where the main replica is hosted.
+ */
+def inconsistentMetadata: Boolean = false
+  }
+
+  object ReassignmentState {
 
-  case object QUEUED extends DirectoryEventRequestState
+/**
+ * The request has not been created.
+ */
+case object None extends ReassignmentState
 
-  case object COMPLETED extends DirectoryEventRequestState
+/**
+ * The request has been queued, it may or may not yet have been sent to 
the Controller.
+ */
+case object Queued extends ReassignmentState{
+  override def inconsistentMetadata: Boolean = true
+}
+
+/**
+ * The controller has acknowledged the new directory assignment and 
persisted the change in metadata.
+ */
+case object Accepted extends ReassignmentState{

Review Comment:
   ditto



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 }
 object ReplicaAlterLogDirsThread {
-  sealed trait DirectoryEventRequestState
+  /**
+   * @param reassignmentState Tracks the state of the replica-to-directory 
assignment update in the metadata
+   * @param topicId   The ID of the topic, which is useful if a 
reverting the assignment is required
+   * @param currentDirThe original directory ID from which the future 
replica fetches from
+   */
+  case class PromotionState(reassignmentState: ReassignmentState, topicId: 
Uuid, currentDir: Uuid) {
+def withAssignment(newDirReassignmentState: ReassignmentState): 
PromotionState =
+  PromotionState(newDirReassignmentState, topicId, currentDir)
+  }
+
+  /**
+   * Represents the state of the request to update the directory assignment 
from the current replica directory
+   * to the future replica directory.
+   */
+  sealed trait ReassignmentState {
+/**
+ * @return true if the directory assignment in the cluster metadata may be 
inconsistent with the actual
+ * directory where the main replica is hosted.
+ */
+def inconsistentMetadata: Boolean = false
+  }
+
+  object ReassignmentState {
 
-  case object QUEUED extends DirectoryEventRequestState
+/**
+ * The request has not been created.
+ */
+case object None extends ReassignmentState
 
-  case object COMPLETED extends