dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080
########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + Review comment: Could we add the topic id to the `toString`? ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { + val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort + val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: nit: `topicId` instead of `topicIdRaw`? ########## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ########## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { + partitionMapLock.lockInterruptibly() + try { + partitions.foreach { tp => + val currentState = partitionStates.stateValue(tp) + if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: We need to address this point. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") + if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: We need to address this point. ########## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ########## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { + val partition = new TopicPartition("topic1", 0) + val fetcher = new MockFetcherThread + + // Start with no topic IDs + fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) + fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + + def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + verifyFetchState(fetcher.fetchState(partition), None) + + // Add topic ID + fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: Right. We should get here if the partition has a zero topic ID, I think. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { + val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { + val aliveBrokersIds = Seq(0, 1) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) + try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { + val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort + val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID + val topicIdOpt = if (usesTopicIds) Some(topicId) else None + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) + try { + val topicPartition = new TopicPartition(topic, 0) + val aliveBrokersIds = Seq(0, 1) + replicaManager.createPartition(topicPartition) + .createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(0, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(topicIdRaw, tp, aliveBrokersIds, leaderAndIsr, version = version) + replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + val partition = replicaManager.getPartitionOrException(tp) + assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size) + + // Append a couple of messages. + for (i <- 1 to 40) { + val records = TestUtils.singletonRecords(s"message $i".getBytes) + appendRecords(replicaManager, tp, records).onFire { response => + assertEquals(Errors.NONE, response.error) + } + } + + // find the live and different folder Review comment: nit: Could we update the comments to start with a capital letter and to finish with a dot similarly to the comment above? Or, update the comment above to look like this one. I don't mind about the format but only about the consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org