Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon merged PR #15557: URL: https://github.com/apache/kafka/pull/15557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1559170927 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { +val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) +// Revert any reassignments for partitions that did not complete the future replica promotion +if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) +} +this.promotionStates.remove(topicPartition) + } Review Comment: Thanks for the explanation! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on PR #15557: URL: https://github.com/apache/kafka/pull/15557#issuecomment-2047083295 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1557584496 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { +val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) +// Revert any reassignments for partitions that did not complete the future replica promotion +if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) +} +this.promotionStates.remove(topicPartition) + } Review Comment: Great question. > in what circumstance, the promotionStates will not contain the topic partition? `promotionStates` will not contain the partition if it's either removed twice, or removed before without being added before. The `addPartitions/removePartitions` semantics in `ReplicaAlterLogDirsThread` seem to have been intended to tolerate this behavior. Have a look at `handleLogDirFailure` in `ReplicaManager`, when a directory fails, all partitions in the failed directory end up being passed to `removePartitions` (via `removeFetcherForPartitions`), regardless of whether they were ever added. So `removePartitions` needs to be idempotent, and tolerate unknown partitions. > before this change, we'll send out AssignReplicasToDirsRequest no matter what, but now, we will skip when no PromotionState No, I think the behavior here is the same before this change, in which `partitionState <- partitionAssignmentRequestState(topicPartition)` runs inside the Scala `for` comprehension, and if `partitionAssignmentRequestState(topicPartition)` results in `None` then the execution stops. That's equivalent to an `if` gate on `this.promotionStates.containsKey(topicPartition)`. > Should we still invoke directoryEventHandler.handleAssignment even if no PromotionState? I think no, but I'd like to hear your thought here. I agree we should not invoke `directoryEventHandler.handleAssignment` if there is no `PromotionState`, because it's only created if `addPartitions` is called for the partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1555305760 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + if (this.promotionStates.containsKey(topicPartition)) { +val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition) +// Revert any reassignments for partitions that did not complete the future replica promotion +if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) { + directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get, () => ()) +} +this.promotionStates.remove(topicPartition) + } Review Comment: In your opinion, if what circumstance, the `promotionStates` will not contain the topic partition? Maybe the `removePartitions` got called multiple times? I'm thinking, because before this change, we'll send out `AssignReplicasToDirsRequest` no matter what, but now, we will skip when no `PromotionState`, will that cause any potential problem? Maybe when upgrading? (Is it possible?) Should we still invoke `directoryEventHandler.handleAssignment` even if no `PromotionState`? I think no, but I'd like to hear your thought here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on PR #15557: URL: https://github.com/apache/kafka/pull/15557#issuecomment-2041535672 I think I've addressed the issues. The failing tests on the most recent build are unrelated. PTAL @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on PR #15557: URL: https://github.com/apache/kafka/pull/15557#issuecomment-2033422766 There are quite many failed tests because of this change. Could you take a look? https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15557/4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on PR #15557: URL: https://github.com/apache/kafka/pull/15557#issuecomment-2030993093 There's a compilation error in jdk8_scala2.12: ``` [2024-04-01T13:50:09.625Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15557@2/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:119:9: ambiguous reference to overloaded definition, [2024-04-01T13:50:09.625Z] both method debug in trait Logger of type (x$1: String, x$2: Object*)Unit [2024-04-01T13:50:09.625Z] and method debug in trait Logger of type (x$1: String, x$2: Any, x$3: Any)Unit [2024-04-01T13:50:09.625Z] match argument types (String,org.apache.kafka.common.TopicPartition,kafka.server.ReplicaAlterLogDirsThread.ReassignmentState) ``` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15557/3/pipeline Please help fix it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1547006909 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { Review Comment: Sounds good to me. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546282044 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { Review Comment: Yes, I think so. I'll add a debug log to `updateReassignmentState`, so we can log every state transition. In that situation, if we don't see the respective transition change that would indicate the future log has not caught up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546280257 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) Review Comment: Good idea. I'll test that in `ReplicationControlManagerTest` . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546269825 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) Review Comment: For (3), had a quick look, indeed it's not an easy test. Could we, instead, test the logic in `ReplicationControlManager`? That is, add a test in `ReplicationControlManagerTest`, to make sure after receiving a reassignment request -> log failure request -> reverted reassignment request, then, it should be able to elect another leader... etc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546251867 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { Review Comment: What I'm afraid is the future log, somehow, it never get promoted because it cannot catch up with the local log, users might want to have clue from the log output. WDYT? If you think it's too noisy, we can leave it as is, and add it in the future if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546237766 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { - partition.futureReplicaDirectoryId() -.map(id => { - directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), id, -() => updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED)) - // mark the assignment request state as queued. - updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED) -}) + val targetDir = partition.futureReplicaDirectoryId().get + val topicIdPartition = new TopicIdPartition(topicId.get, topicPartition.partition()) + directoryEventHandler.handleAssignment(topicIdPartition, targetDir, () => updateReassignmentState(topicPartition, ReassignmentState.Accepted)) + updateReassignmentState(topicPartition, ReassignmentState.Queued) }) - case Some(ReplicaAlterLogDirsThread.COMPLETED) => + case ReassignmentState.Accepted => // Promote future replica if controller accepted the request and the replica caught-up with the original log. if (partition.maybeReplaceCurrentWithFutureReplica()) { Review Comment: Same as with L127, let's continue in the other thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546237017 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { Review Comment: I think that would be a very noisy log, this method is called for every fetched batch. I'm not sure how it would be useful. What do you have in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
soarez commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546234986 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) Review Comment: Good questions. 1. Yes. See `ReplicationController.handleAssignReplicasToDirs`. If a replica is assigned to a directory that is not an online directory in the broker, leadership and ISR is updated. 2. Yes, I'll update the KIP. At the time, I thought it made more sense to use `UUID.LOST_DIR`, but in practice it's a more general solution to just revert the assignment. 3. I've started but haven't yet been able to come up with an integration test for this. It requires blocking the process of replica fetching and assginment processing and injecting failures at very specific times. Let me know if you think it's critical to have an integration test for this, and I'll give it another go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546153410 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ()) +for (topicPartition <- topicPartitions) { + // Revert any reassignments for partitions that did not complete the future replica promotion + val PromotionState(reassignmentState, topicId, originalDir) = this.promotionState(topicPartition) + if (reassignmentState.inconsistentMetadata) { +directoryEventHandler.handleAssignment(new TopicIdPartition(topicId, topicPartition.partition()), originalDir, () => ()) + } + + this.promotionStates.remove(topicPartition) +} super.removePartitions(topicPartitions) } + private def promotionState(topicPartition: TopicPartition): PromotionState = promotionStates.get(topicPartition) + + private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionState(topicPartition).reassignmentState + // Visible for testing - private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = { -assignmentRequestStates.put(topicPartition, state) + private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = { +promotionStates.put(topicPartition, promotionState(topicPartition).withAssignment(state)) } private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = { val topicId = partition.topicId if (topicId.isEmpty) throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.") -partitionAssignmentRequestState(topicPartition) match { - case None => +reassignmentState(topicPartition) match { + case ReassignmentState.None => // Schedule assignment request and don't promote the future replica yet until the controller has accepted the request. partition.runCallbackIfFutureReplicaCaughtUp(_ => { - partition.futureReplicaDirectoryId() -.map(id => { - directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), id, -() => updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED)) - // mark the assignment request state as queued. - updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED) -}) + val targetDir = partition.futureReplicaDirectoryId().get + val topicIdPartition = new TopicIdPartition(topicId.get, topicPartition.partition()) + directoryEventHandler.handleAssignment(topicIdPartition, targetDir, () => updateReassignmentState(topicPartition, ReassignmentState.Accepted)) + updateReassignmentState(topicPartition, ReassignmentState.Queued) }) - case Some(ReplicaAlterLogDirsThread.COMPLETED) => + case ReassignmentState.Accepted => // Promote future replica if controller accepted the request and the replica caught-up with the original log. if (partition.maybeReplaceCurrentWithFutureReplica()) { Review Comment: Should we log something in debug level or trace level, if `maybeReplaceCurrentWithFutureReplica` return false, because of not caught-up? ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -96,57 +97,55 @@ class ReplicaAlterLogDirsThread(name: String, } override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = { -// Schedule assignment request to revert any queued request before cancelling -for { - topicPartition <- topicPartitions - partitionState <- partitionAssignmentRequestState(topicPartition) - if partitionState == QUEUED - partition = replicaMgr.getPartitionOrException(topicPartition) - topicId <- partition.topicId - directoryId <- partition.logDirectoryId() - topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition()) -}
Re: [PR] KAFKA-16297: Race condition while promoting future replica [kafka]
showuon commented on code in PR #15557: URL: https://github.com/apache/kafka/pull/15557#discussion_r1546078129 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String, } } object ReplicaAlterLogDirsThread { - sealed trait DirectoryEventRequestState + /** + * @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata + * @param topicId The ID of the topic, which is useful if a reverting the assignment is required + * @param currentDirThe original directory ID from which the future replica fetches from + */ + case class PromotionState(reassignmentState: ReassignmentState, topicId: Uuid, currentDir: Uuid) { +def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState = + PromotionState(newDirReassignmentState, topicId, currentDir) + } + + /** + * Represents the state of the request to update the directory assignment from the current replica directory + * to the future replica directory. + */ + sealed trait ReassignmentState { +/** + * @return true if the directory assignment in the cluster metadata may be inconsistent with the actual + * directory where the main replica is hosted. + */ +def inconsistentMetadata: Boolean = false Review Comment: If it's "maybe" inconsistent, should we make it clear in the variable name? ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String, } } object ReplicaAlterLogDirsThread { - sealed trait DirectoryEventRequestState + /** + * @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata + * @param topicId The ID of the topic, which is useful if a reverting the assignment is required + * @param currentDirThe original directory ID from which the future replica fetches from + */ + case class PromotionState(reassignmentState: ReassignmentState, topicId: Uuid, currentDir: Uuid) { +def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState = + PromotionState(newDirReassignmentState, topicId, currentDir) + } + + /** + * Represents the state of the request to update the directory assignment from the current replica directory + * to the future replica directory. + */ + sealed trait ReassignmentState { +/** + * @return true if the directory assignment in the cluster metadata may be inconsistent with the actual + * directory where the main replica is hosted. + */ +def inconsistentMetadata: Boolean = false + } + + object ReassignmentState { - case object QUEUED extends DirectoryEventRequestState +/** + * The request has not been created. + */ +case object None extends ReassignmentState - case object COMPLETED extends DirectoryEventRequestState +/** + * The request has been queued, it may or may not yet have been sent to the Controller. + */ +case object Queued extends ReassignmentState{ + override def inconsistentMetadata: Boolean = true +} + +/** + * The controller has acknowledged the new directory assignment and persisted the change in metadata. + */ +case object Accepted extends ReassignmentState{ Review Comment: ditto ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -188,9 +194,52 @@ class ReplicaAlterLogDirsThread(name: String, } } object ReplicaAlterLogDirsThread { - sealed trait DirectoryEventRequestState + /** + * @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata + * @param topicId The ID of the topic, which is useful if a reverting the assignment is required + * @param currentDirThe original directory ID from which the future replica fetches from + */ + case class PromotionState(reassignmentState: ReassignmentState, topicId: Uuid, currentDir: Uuid) { +def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState = + PromotionState(newDirReassignmentState, topicId, currentDir) + } + + /** + * Represents the state of the request to update the directory assignment from the current replica directory + * to the future replica directory. + */ + sealed trait ReassignmentState { +/** + * @return true if the directory assignment in the cluster metadata may be inconsistent with the actual + * directory where the main replica is hosted. + */ +def inconsistentMetadata: Boolean = false + } + + object ReassignmentState { - case object QUEUED extends DirectoryEventRequestState +/** + * The request has not been created. + */ +case object None extends ReassignmentState - case object COMPLETED extends