[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714936472 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: We could add tests for the ReplicaManager which verifies that the topic id is updated and propagated to the fetcher manager. Is it what you are thinking about? Regarding this particular test, what would you add? If the topic id is zero, we would just set it. As you said, the logic which prevents this from happening is before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714923995 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { +val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) +assertTrue(fetchState.isDefined) +assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { +val aliveBrokersIds = Seq(0, 1) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) +try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { +val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort +val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: Ah.. It is because `topicId` is also used in the statement. You could indeed use `this.topicId` there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + Review comment: Could we add the topic id to the `toString`? ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3469,4 +3472,88 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY ) } + + def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], + tp: TopicPartition, + expectedTopicId: Option[Uuid]): Unit = { +val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) +assertTrue(fetchState.isDefined) +assertEquals(expectedTopicId, fetchState.get.topicId) + } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = { +val aliveBrokersIds = Seq(0, 1) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + brokerId = 0, aliveBrokersIds) +try { + val tp = new TopicPartition(topic, 0) + val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0) + + val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse1.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None) + + val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, aliveBrokersIds, leaderAndIsr) + val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) + assertEquals(Errors.NONE, leaderAndIsrResponse2.error) + + assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, Some(topicId)) + +} finally { + replicaManager.shutdown(checkpointHW = false) +} + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { +val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort +val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID Review comment: nit: `topicId` instead of `topicIdRaw`? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: We need to address this point. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") +if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: We need to address this point. ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -929,6 +929,29 @@ class AbstractFetcherThreadTest { fetcher.verifyLastFetchedEpoch(partition, Some(5)) } + @Test + def testMaybeUpdateTopicIds(): Unit = { +val partition = new TopicPartition("topic1", 0) +val fetcher = new MockFetcherThread + +// Start with no topic IDs +fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) +fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, leaderEpoch = 0))) + + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +verifyFetchState(fetcher.fetchState(partition), None) + +// Add topic ID +fetcher.maybeUpdateTopicIds(Set(partition), topicName => topicIds.get(topicName)) Review comment: Right. We should get here if the partition has
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714557835 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") +if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: Don't forget to remove `&& metadataCache.hasAliveBroker(partitionState.leader)` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714549045 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -133,4 +137,75 @@ class AbstractFetcherManagerTest { assertEquals(0, fetcherManager.deadThreadCount) EasyMock.verify(fetcher) } + + @Test + def testMaybeUpdateTopicIds(): Unit = { +val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread]) +val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { +fetcher + } +} + +val fetchOffset = 10L +val leaderEpoch = 15 +val tp1 = new TopicPartition("topic", 0) +val tp2 = new TopicPartition("topic", 1) +val topicId = Some(Uuid.randomUuid()) + +// Start out with no topic ID. +val initialFetchState1 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(0, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + +// Include a partition on a different leader +val initialFetchState2 = InitialFetchState( + topicId = None, + leader = new BrokerEndPoint(1, "localhost", 9092), + currentLeaderEpoch = leaderEpoch, + initOffset = fetchOffset) + +val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> initialFetchState2.leader.id) +val topicIds = (_: String) => topicId + +// Simulate calls to different fetchers due to different leaders +EasyMock.expect(fetcher.start()) +EasyMock.expect(fetcher.start()) + +EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1))) + .andReturn(Set(tp1)) +EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2))) + .andReturn(Set(tp2)) + +EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(None, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) + +EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds)) +EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds)) + +EasyMock.expect(fetcher.fetchState(tp1)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.expect(fetcher.fetchState(tp2)) + .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, leaderEpoch, Truncating, lastFetchedEpoch = None))) +EasyMock.replay(fetcher) + +def verifyFetchState(fetchState: Option[PartitionFetchState], expectedTopicId: Option[Uuid]): Unit = { + assertTrue(fetchState.isDefined) + assertEquals(expectedTopicId, fetchState.get.topicId) +} + +fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 -> initialFetchState2)) +verifyFetchState(fetcher.fetchState(tp1), None) +verifyFetchState(fetcher.fetchState(tp2), None) + +fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds) +verifyFetchState(fetcher.fetchState(tp1), topicId) +verifyFetchState(fetcher.fetchState(tp2), topicId) + +EasyMock.verify(fetcher) Review comment: Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: Right. `updateAndMoveToEnd` removes the item first and then adds it back at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions in the `LinkedHashMap` in order to treat all partitions fairly. When we use a session, this is done on the broker as well btw. The new `update` method will only update the state and will overwrite any existing state in the `LinkedHashMap`. This is what `map.put()` does. However, it does not change the order in the `LinkedHashMap` which is good. We don't have to change the oder when we set the topic id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r713916798 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1424,6 +1428,21 @@ class ReplicaManager(val config: KafkaConfig, val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet updateLeaderAndFollowerMetrics(followerTopicSet) + if (topicIdUpdateFollowerPartitions.nonEmpty) +updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest) + + leaderAndIsrRequest.partitionStates.forEach { partitionState => +val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) +/* + * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() + * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. + * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. + * we need to map this topic-partition to OfflinePartition instead. + */ +if (localLog(topicPartition).isEmpty) + markPartitionOffline(topicPartition) + } Review comment: I think that this code was removed in trunk. Did you bring it back by mistake? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.info(s"Updating log for $topicPartition to assign topic ID " + s"$topicId from LeaderAndIsr request from controller $controllerId with correlation " + s"id $correlationId epoch $controllerEpoch") +if (partitionState.leader != localBrokerId && metadataCache.hasAliveBroker(partitionState.leader)) Review comment: nit: I think that I would prefer to have `metadataCache.hasAliveBroker(partitionState.leader` in `updateTopicIdForFollowers` after all. We could do this check when we build the Map at L1778. ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String, var partitionData: Seq[(TopicPartition, FetchData)] = null val request = fetchRequest.build() -val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo() +val topicIds = new mutable.HashMap[String, Uuid]() Review comment: We should add a small comment here which explains why it is actually OK to re-build the mapping from the request. It is not obvious at first. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Set[Partition], +correlationId: Int, +topicIds: String => Option[Uuid]): Unit = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +try { + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionStates.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionStates.map { partition => + partition.topicPartition -> partition.leaderReplicaIdOpt.getOrElse(-1) +}.toMap + replicaFetcherManager.maybeUpdateTopicIds(partitionsToUpdateFollowerWithLeader, topicIds) + } +} catch { + case e: Throwable => +stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + + s"received from controller $controllerId epoch $controllerEpoch", e) Review comment: Should we update this log to be specific about the topic ID update? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Set[Partition], +correlationId: Int, +
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r710837733 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs") } + def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, Int)], topicIds: String => Option[Uuid]): Unit = { Review comment: 1) Should we called this one `maybeUpdateTopicIds`? Using `add` feels a bit awkward for the topic id field. Also, the `ToFetcherThread` is not necessary in my opinion because we already know that the fetcher manager is all about managing fetcher threads. 2) Could we use a `Map` instead of `Set`? `Map` feels a bit more natural for a key-value pair mapping. 3) Could we add a small scaladoc to explain the parameters? ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala ## @@ -20,7 +20,7 @@ import com.yammer.metrics.core.Gauge import kafka.cluster.BrokerEndPoint import kafka.metrics.KafkaYammerMetrics import kafka.utils.TestUtils -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, Uuid} Review comment: Should we add few unit tests for the newly added methods in this suite and in `AbstractFetcherThreadTest` ? ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs") } + def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, Int)], topicIds: String => Option[Uuid]): Unit = { +lock synchronized { + val partitionsPerFetcher = partitionsToUpdate.groupMap(partitionsToUpdate => BrokerIdAndFetcherId(partitionsToUpdate._2, getFetcherId(partitionsToUpdate._1)))(_._1) Review comment: We usually avoid using `._1` or `._2` and would rather prefer to deconstruct the tuple in order to name the argument: `groupMap { case (tp, leader) => `. Does `groupMap` work with all the versions of Scala that we use? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { Review comment: If we rename `addTopicIdsToFetcherThread`, we should also rename this one to follow the same naming. ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -281,14 +281,14 @@ class ReplicaAlterLogDirsThread(name: String, val fetchRequestOpt = if (requestMap.isEmpty) { None } else { - val version: Short = if (topicIds.containsKey(tp.topic())) + val version: Short = if (fetchState.topicId.isEmpty) 12 else ApiKeys.FETCH.latestVersion // Set maxWait and minBytes to 0 because the response should return immediately if // the future log has caught up with the current log of the partition val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 0, 0, requestMap, -topicIds).setMaxBytes(maxBytes) +Collections.singletonMap(tp.topic(), fetchState.topicId.getOrElse(Uuid.ZERO_UUID))).setMaxBytes(maxBytes) Review comment: nit: `topic()` -> `topic`. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) +if (currentState != null) { + val updatedState = currentState.updateTopicId(topicIds(tp.topic)) + partitionStates.updateAndMoveToEnd(tp, updatedState) Review comment: I think that we should use `update` here instead of `updateAndMoveToEnd`. `updateAndMoveToEnd` will basically deprioritize the partition and I don't think that we want to do this when we only update its topic id. ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -2810,4 +2811,132 @@ class ReplicaManagerTest { Replicas.NONE, Replicas.NONE, 2, 123, 456, replicaManager.calculateDeltaChanges(TEST_DELTA)) } + + @Test + def testPartitionFetchStateUpdatesWithTopicIdAdde
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r710314809 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +val partitionsToUpdateFollower = mutable.Set.empty[Partition] +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition))) +} + replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader, topicIds) Review comment: I meant that in the above code, you could check if the leader exists while iterating over the partitions. We don't have to have another loop before to do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r710075575 ## File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala ## @@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for partitions $initialOffsetAndEpochs") } + def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = { +lock synchronized { + val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1) + + for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) { +val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) Review comment: Yeah, that's right. I think that an important point is to let the FetcherManager compute the fetcher id based on the topic-partition instead of computing it in the replica manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r710074932 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +val partitionsToUpdateFollower = mutable.Set.empty[Partition] +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition))) +} + replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader, topicIds) Review comment: I don't think that this is strictly required. If we get a new request with the current epoch, the leader must be there. I think that we could combine this is the above code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { Review comment: nit: Should we prefix this method with `maybe` to indicate that it would set the topic id only if there is a state for the topic partition? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String, } finally partitionMapLock.unlock() } + def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => Option[Uuid]) = { +partitionMapLock.lockInterruptibly() +try { + partitions.foreach { tp => +val currentState = partitionStates.stateValue(tp) Review comment: Should we ensure that there is actually a state? It must be there but it might be better to be safe. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String, */ private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { - currentState + if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) { +currentState.updateTopicId(initialFetchState.topicId) + } else { +currentState + } Review comment: Is this change still necessary? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled + +val partitionsToUpdateFollower = mutable.Set.empty[Partition] +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToUpdateFollowerWithLeader = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + (partition.topicPartition, BrokerAndFetcherId(leader, replicaFetcherManager.getFetcherId(partition.topicPartition))) Review comment: It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, the `topic-partition` (to compute the fetcher id, and the `topic id`. How about passing just those? I would also let the fetcher manager compute the fetcher id. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r693836229 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: I still don't see how a follower would use the alter isr api as a follower does not update the isr, only a leader does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r693835412 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { +// Only change partition state when the leader is available +partitionsToUpdateFollower += partition + } else { +// The leader broker should always be present in the metadata cache. +// If not, we should record the error message and abort the transition process for this partition +stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + + s"(last update controller epoch ${partitionState.controllerEpoch}) " + + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") + } + } + + if (isShuttingDown.get()) { +if (traceLoggingEnabled) { + partitionsToUpdateFollower.foreach { partition => +stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).leader} " + + "since it is shutting down") + } +} + } else { +val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToUpdateFollower.map { partition => + val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache. +getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode()) + val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port()) + val log = partition.localLogOrException + val fetchOffset = initialFetchOffset(log) + partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset) +}.toMap + + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) Review comment: I have been thinking about this. As this point, the fetcher is already running so we know that we only want to update the partition state with the newly assigned topic id. Creating the `InitialFetchState` seems a bit wasteful in this case, no? I wonder if it would not be better to add a new method to `AbstractFetcherManager` to basically update the partition states with the newly assigned topic ids instead. Is it something that you have already considered? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -82,7 +82,7 @@ class AbstractFetcherThreadTest { // add one partition to create the consumer lag metric fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) -fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) +fetcher.addPartitions(Map(partition -> initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0))) Review comment: nit: If we keep `topicIds` as a Scala map, we would get an Option directly with `topicIds.get(...)`. Also, a space is missing after the `,` and the `()` could be omitted ;). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -82,7 +82,7 @@ class AbstractFetcherThreadTest { // add one partition to create the consumer lag metric fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) -fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) +fetcher.addPartitions(Map(partition -> initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0))) Review comment: nit: If we keep `topicIds` as a Scala map, we would get an Option directly with `topicIds.get(...)`. Also, a space is missing after the `,`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692169643 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { Review comment: Intuitively, I would have thoughts that we don't need this. The broker is already a follower in this case so the leader should be alive. What's your take on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692168365 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() Review comment: nit: We could omit specifying the type here and use `mutable.Set.empty[Partition]`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692166774 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? Review comment: I don't think so. Those logs, which are exactly the same that we have in `makeFollowers`, would be really confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692165645 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { Review comment: Is `topicIds` necessary? I suppose that we could get the `topicId` from the `Partition`, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692164654 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1544,7 +1554,7 @@ class ReplicaManager(val config: KafkaConfig, // replica from source dir to destination dir logManager.abortAndPauseCleaning(topicPartition) - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader, + futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic()), leader, Review comment: nit: parenthesis not required after `topic` ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692164057 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1420,6 +1423,10 @@ class ReplicaManager(val config: KafkaConfig, } val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) } + val partitionsToUpdateIdForFollower = topicIdUpdatePartitions.filter { case (_, partitionState) => +partitionState.leader != localBrokerId + } Review comment: Could we avoid having to re-iterate over `topicIdUpdatePartitions` by applying the condition before adding the partition to `topicIdUpdatePartitions`? I have opened a PR to do the same for the two collections above: https://github.com/apache/kafka/pull/11225 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692162163 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: Right. Reusing `makeFollowers` would not work out of the box. I am not sure to follow your point regarding the possibility of overriding a pending isr state. I thought that only the leader updates the isr state and the followers only updates it via `updateAssignmentAndIsr` based on the controller state. I might be missing something. I am not against having `updateTopicIdForFollowers` but I wanted to ensure that we have thought about all the options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r690993461 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: Have we considered relying on `makeFollowers` instead of introducing `updateTopicIdForFollowers`? The two methods are really similar. The main notable difference is that `makeFollowers` shutdowns the fetcher thread. We could perhaps optimize this part to not do it if the broker is already a follower for the partition. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -854,6 +859,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + s")" } + + def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = { +PartitionFetchState(topicId, fetchOffset, lag, currentLeaderEpoch, delay, state, lastFetchedEpoch) Review comment: nit: We could use `this.copy(topicId = topicId`). ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -262,7 +263,8 @@ class ReplicaAlterLogDirsThread(name: String, private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() -val topicIds = replicaMgr.metadataCache.topicNamesToIds() +val topicId = fetchState.topicId +val topicIds = Collections.singletonMap(tp.topic(), topicId.getOrElse(Uuid.ZERO_UUID)) Review comment: nit: I would bring this one closer to `requestBuilder` as it is only used by the request builder or we could even inline it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r690993461 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: Have we considered relying on `makeFollowers` instead of introducing `updateTopicIdForFollowers`? The two methods are really similar. The main notable difference is that `makeFollowers` shutdowns the fetcher thread. We could perhaps optimize this part to not do it if the broker is already a follower for the partition. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -854,6 +859,10 @@ case class PartitionFetchState(fetchOffset: Long, s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + s")" } + + def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = { +PartitionFetchState(topicId, fetchOffset, lag, currentLeaderEpoch, delay, state, lastFetchedEpoch) Review comment: nit: We could use `this.copy(topicId = topicId`). ## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ## @@ -262,7 +263,8 @@ class ReplicaAlterLogDirsThread(name: String, private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() -val topicIds = replicaMgr.metadataCache.topicNamesToIds() +val topicId = fetchState.topicId +val topicIds = Collections.singletonMap(tp.topic(), topicId.getOrElse(Uuid.ZERO_UUID)) Review comment: nit: I would bring this one closer to `requestBuilder` as it is only used by the request builder or we could even inline it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org