[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714936472



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   We could add tests for the ReplicaManager which verifies that the topic 
id is updated and propagated to the fetcher manager. Is it what you are 
thinking about?
   
   Regarding this particular test, what would you add? If the topic id is zero, 
we would just set it. As you said, the logic which prevents this from happening 
is before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714923995



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
   ClientQuotasImage.EMPTY
 )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+  tp: TopicPartition,
+  expectedTopicId: 
Option[Uuid]): Unit = {
+val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+assertTrue(fetchState.isDefined)
+assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+val aliveBrokersIds = Seq(0, 1)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+  brokerId = 0, aliveBrokersIds)
+try {
+  val tp = new TopicPartition(topic, 0)
+  val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+  val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+  val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
   Ah.. It is because `topicId` is also used in the statement. You could 
indeed use `this.topicId` there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714815080



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -862,6 +877,10 @@ case class PartitionFetchState(fetchOffset: Long,
   s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +

Review comment:
   Could we add the topic id to the `toString`?

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3469,4 +3472,88 @@ class ReplicaManagerTest {
   ClientQuotasImage.EMPTY
 )
   }
+
+  def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: 
AbstractFetcherManager[T],
+  tp: TopicPartition,
+  expectedTopicId: 
Option[Uuid]): Unit = {
+val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp))
+assertTrue(fetchState.isDefined)
+assertEquals(expectedTopicId, fetchState.get.topicId)
+  }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+val aliveBrokersIds = Seq(0, 1)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
+  brokerId = 0, aliveBrokersIds)
+try {
+  val tp = new TopicPartition(topic, 0)
+  val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
+
+  val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest1, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+
+  val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp, 
aliveBrokersIds, leaderAndIsr)
+  val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
+  assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
+
+  assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, 
Some(topicId))
+
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = {
+val version = if (usesTopicIds) 
LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort
+val topicIdRaw = if (usesTopicIds) topicId else Uuid.ZERO_UUID

Review comment:
   nit: `topicId` instead of `topicIdRaw`?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   We need to address this point.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
 stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
   s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
   s"id $correlationId epoch $controllerEpoch")
+if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
   We need to address this point.

##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -929,6 +929,29 @@ class AbstractFetcherThreadTest {
 fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val partition = new TopicPartition("topic1", 0)
+val fetcher = new MockFetcherThread
+
+// Start with no topic IDs
+fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+fetcher.addPartitions(Map(partition -> initialFetchState(None, 0L, 
leaderEpoch = 0)))
+
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+verifyFetchState(fetcher.fetchState(partition), None)
+
+// Add topic ID
+fetcher.maybeUpdateTopicIds(Set(partition), topicName => 
topicIds.get(topicName))

Review comment:
   Right. We should get here if the partition has 

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714557835



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
 stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
   s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
   s"id $correlationId epoch $controllerEpoch")
+if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
   Don't forget to remove `&& 
metadataCache.hasAliveBroker(partitionState.leader)` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714549045



##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -133,4 +137,75 @@ class AbstractFetcherManagerTest {
 assertEquals(0, fetcherManager.deadThreadCount)
 EasyMock.verify(fetcher)
   }
+
+  @Test
+  def testMaybeUpdateTopicIds(): Unit = {
+val fetcher: AbstractFetcherThread = 
EasyMock.mock(classOf[AbstractFetcherThread])
+val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 2) {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+fetcher
+  }
+}
+
+val fetchOffset = 10L
+val leaderEpoch = 15
+val tp1 = new TopicPartition("topic", 0)
+val tp2 = new TopicPartition("topic", 1)
+val topicId = Some(Uuid.randomUuid())
+
+// Start out with no topic ID.
+val initialFetchState1 = InitialFetchState(
+  topicId = None,
+  leader = new BrokerEndPoint(0, "localhost", 9092),
+  currentLeaderEpoch = leaderEpoch,
+  initOffset = fetchOffset)
+
+// Include a partition on a different leader
+val initialFetchState2 = InitialFetchState(
+  topicId = None,
+  leader = new BrokerEndPoint(1, "localhost", 9092),
+  currentLeaderEpoch = leaderEpoch,
+  initOffset = fetchOffset)
+
+val partitionsToUpdate = Map(tp1 -> initialFetchState1.leader.id, tp2 -> 
initialFetchState2.leader.id)
+val topicIds = (_: String) => topicId
+
+// Simulate calls to different fetchers due to different leaders
+EasyMock.expect(fetcher.start())
+EasyMock.expect(fetcher.start())
+
+EasyMock.expect(fetcher.addPartitions(Map(tp1 -> initialFetchState1)))
+  .andReturn(Set(tp1))
+EasyMock.expect(fetcher.addPartitions(Map(tp2 -> initialFetchState2)))
+  .andReturn(Set(tp2))
+
+EasyMock.expect(fetcher.fetchState(tp1))
+  .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.expect(fetcher.fetchState(tp2))
+  .andReturn(Some(PartitionFetchState(None, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+
+EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp1), topicIds))
+EasyMock.expect(fetcher.maybeUpdateTopicIds(Set(tp2), topicIds))
+
+EasyMock.expect(fetcher.fetchState(tp1))
+  .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.expect(fetcher.fetchState(tp2))
+  .andReturn(Some(PartitionFetchState(topicId, fetchOffset, None, 
leaderEpoch, Truncating, lastFetchedEpoch = None)))
+EasyMock.replay(fetcher)
+
+def verifyFetchState(fetchState: Option[PartitionFetchState], 
expectedTopicId: Option[Uuid]): Unit = {
+  assertTrue(fetchState.isDefined)
+  assertEquals(expectedTopicId, fetchState.get.topicId)
+}
+
+fetcherManager.addFetcherForPartitions(Map(tp1 -> initialFetchState1, tp2 
-> initialFetchState2))
+verifyFetchState(fetcher.fetchState(tp1), None)
+verifyFetchState(fetcher.fetchState(tp2), None)
+
+fetcherManager.maybeUpdateTopicIds(partitionsToUpdate, topicIds)
+verifyFetchState(fetcher.fetchState(tp1), topicId)
+verifyFetchState(fetcher.fetchState(tp2), topicId)
+
+EasyMock.verify(fetcher)

Review comment:
   Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r714547943



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   Right. `updateAndMoveToEnd` removes the item first and then adds it back 
at the end of the `LinkedHashMap`. We do rely on this to rotate the partitions 
in the `LinkedHashMap` in order to treat all partitions fairly. When we use a 
session, this is done on the broker as well btw.
   
   The new `update` method will only update the state and will overwrite any 
existing state in the `LinkedHashMap`. This is what `map.put()` does. However, 
it does not change the order in the `LinkedHashMap` which is good. We don't 
have to change the oder when we set the topic id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-22 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r713916798



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1424,6 +1428,21 @@ class ReplicaManager(val config: KafkaConfig,
   val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
   updateLeaderAndFollowerMetrics(followerTopicSet)
 
+  if (topicIdUpdateFollowerPartitions.nonEmpty)
+updateTopicIdForFollowers(controllerId, controllerEpoch, 
topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)
+
+  leaderAndIsrRequest.partitionStates.forEach { partitionState =>
+val topicPartition = new TopicPartition(partitionState.topicName, 
partitionState.partitionIndex)
+/*
+   * If there is offline log directory, a Partition object may have 
been created by getOrCreatePartition()
+   * before getOrCreateReplica() failed to create local replica due to 
KafkaStorageException.
+   * In this case ReplicaManager.allPartitions will map this 
topic-partition to an empty Partition object.
+   * we need to map this topic-partition to OfflinePartition instead.
+   */
+if (localLog(topicPartition).isEmpty)
+  markPartitionOffline(topicPartition)
+  }

Review comment:
   I think that this code was removed in trunk. Did you bring it back by 
mistake?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1396,6 +1398,8 @@ class ReplicaManager(val config: KafkaConfig,
 stateChangeLogger.info(s"Updating log for $topicPartition 
to assign topic ID " +
   s"$topicId from LeaderAndIsr request from controller 
$controllerId with correlation " +
   s"id $correlationId epoch $controllerEpoch")
+if (partitionState.leader != localBrokerId && 
metadataCache.hasAliveBroker(partitionState.leader))

Review comment:
   nit: I think that I would prefer to have 
`metadataCache.hasAliveBroker(partitionState.leader` in  
`updateTopicIdForFollowers` after all. We could do this check when we build the 
Map at L1778.

##
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##
@@ -76,7 +76,13 @@ class ReplicaAlterLogDirsThread(name: String,
 var partitionData: Seq[(TopicPartition, FetchData)] = null
 val request = fetchRequest.build()
 
-val (topicIds, topicNames) = replicaMgr.metadataCache.topicIdInfo()
+val topicIds = new mutable.HashMap[String, Uuid]()

Review comment:
   We should add a small comment here which explains why it is actually OK 
to re-build the mapping from the request. It is not obvious at first.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Set[Partition],
+correlationId: Int,
+topicIds: String => Option[Uuid]): 
Unit = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+try {
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionStates.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} since it is shutting 
down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = partitionStates.map { 
partition =>
+  partition.topicPartition -> 
partition.leaderReplicaIdOpt.getOrElse(-1)
+}.toMap
+
replicaFetcherManager.maybeUpdateTopicIds(partitionsToUpdateFollowerWithLeader, 
topicIds)
+  }
+} catch {
+  case e: Throwable =>
+stateChangeLogger.error(s"Error while processing LeaderAndIsr request 
with correlationId $correlationId " +
+  s"received from controller $controllerId epoch $controllerEpoch", e)

Review comment:
   Should we update this log to be specific about the topic ID update?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1739,6 +1758,37 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Set[Partition],
+correlationId: Int,
+

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-17 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r710837733



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
Int)], topicIds: String => Option[Uuid]): Unit = {

Review comment:
   1) Should we called this one `maybeUpdateTopicIds`? Using `add` feels a 
bit awkward for the topic id field. Also, the `ToFetcherThread` is not 
necessary in my opinion because we already know that the fetcher manager is all 
about managing fetcher threads.
   
   2) Could we use a `Map` instead of `Set`? `Map` feels a bit more natural for 
a key-value pair mapping.
   
   3) Could we add a small scaladoc to explain the parameters?

##
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##
@@ -20,7 +20,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.BrokerEndPoint
 import kafka.metrics.KafkaYammerMetrics
 import kafka.utils.TestUtils
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicPartition, Uuid}

Review comment:
   Should we add few unit tests for the newly added methods in this suite 
and in `AbstractFetcherThreadTest` ?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -163,6 +163,16 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
Int)], topicIds: String => Option[Uuid]): Unit = {
+lock synchronized {
+  val partitionsPerFetcher = 
partitionsToUpdate.groupMap(partitionsToUpdate => 
BrokerIdAndFetcherId(partitionsToUpdate._2, 
getFetcherId(partitionsToUpdate._1)))(_._1)

Review comment:
   We usually avoid using `._1` or `._2` and would rather prefer to 
deconstruct the tuple in order to name the argument: `groupMap { case (tp, 
leader) => `.
   
   Does `groupMap` work with all the versions of Scala that we use?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {

Review comment:
   If we rename `addTopicIdsToFetcherThread`, we should also rename this 
one to follow the same naming.

##
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##
@@ -281,14 +281,14 @@ class ReplicaAlterLogDirsThread(name: String,
 val fetchRequestOpt = if (requestMap.isEmpty) {
   None
 } else {
-  val version: Short = if (topicIds.containsKey(tp.topic()))
+  val version: Short = if (fetchState.topicId.isEmpty)
 12
   else
 ApiKeys.FETCH.latestVersion
   // Set maxWait and minBytes to 0 because the response should return 
immediately if
   // the future log has caught up with the current log of the partition
   val requestBuilder = FetchRequest.Builder.forReplica(version, replicaId, 
0, 0, requestMap,
-topicIds).setMaxBytes(maxBytes)
+Collections.singletonMap(tp.topic(), 
fetchState.topicId.getOrElse(Uuid.ZERO_UUID))).setMaxBytes(maxBytes)

Review comment:
   nit: `topic()` -> `topic`.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +491,21 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def maybeAddTopicIdsToThread(partitions: Set[TopicPartition], topicIds: 
String => Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)
+if (currentState != null) {
+  val updatedState = currentState.updateTopicId(topicIds(tp.topic))
+  partitionStates.updateAndMoveToEnd(tp, updatedState)

Review comment:
   I think that we should use `update` here instead of 
`updateAndMoveToEnd`. `updateAndMoveToEnd` will basically deprioritize the 
partition and I don't think that we want to do this when we only update its 
topic id.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -2810,4 +2811,132 @@ class ReplicaManagerTest {
   Replicas.NONE, Replicas.NONE, 2, 123, 456,
 replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testPartitionFetchStateUpdatesWithTopicIdAdde

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-16 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r710314809



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))
+}
+
replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader,
 topicIds)

Review comment:
   I meant that in the above code, you could check if the leader exists 
while iterating over the partitions. We don't have to have another loop before 
to do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-16 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r710075575



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = {
+lock synchronized {
+  val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1)
+
+  for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) {
+val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)

Review comment:
   Yeah, that's right. I think that an important point is to let the 
FetcherManager compute the fetcher id based on the topic-partition instead of 
computing it in the replica manager.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-16 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r710074932



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))
+}
+
replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader,
 topicIds)

Review comment:
   I don't think that this is strictly required. If we get a new request 
with the current epoch, the leader must be there. I think that we could combine 
this is the above code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {

Review comment:
   nit: Should we prefix this method with `maybe` to indicate that it would 
set the topic id only if there is a state for the topic partition?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)

Review comment:
   Should we ensure that there is actually a state? It must be there but it 
might be better to be safe.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String,
*/
   private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
 if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
-  currentState
+  if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) 
{
+currentState.updateTopicId(initialFetchState.topicId)
+  } else {
+currentState
+  }

Review comment:
   Is this change still necessary? 

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))

Review comment:
   It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, 
the `topic-partition` (to compute the fetcher id, and the `topic id`. How about 
passing just those? I would also let the fetcher manager compute the fetcher id.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r693836229



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   I still don't see how a follower would use the alter isr api as a 
follower does not update the isr, only a leader does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-23 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r693835412



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  val log = partition.localLogOrException
+  val fetchOffset = initialFetchOffset(log)
+  partition.topicPartition -> 
InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, 
fetchOffset)
+}.toMap
+
+
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

Review comment:
   I have been thinking about this. As this point, the fetcher is already 
running so we know that we only want to update the partition state with the 
newly assigned topic id. Creating the `InitialFetchState` seems a bit wasteful 
in this case, no? I wonder if it would not be better to add a new method to 
`AbstractFetcherManager` to basically update the partition states with the 
newly assigned topic ids instead. Is it something that you have already 
considered?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -82,7 +82,7 @@ class AbstractFetcherThreadTest {
 
 // add one partition to create the consumer lag metric
 fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
-fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+fetcher.addPartitions(Map(partition -> 
initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0)))

Review comment:
   nit: If we keep `topicIds` as a Scala map, we would get an Option 
directly with `topicIds.get(...)`. Also, a space is missing after the `,` and 
the `()` could be omitted ;).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -82,7 +82,7 @@ class AbstractFetcherThreadTest {
 
 // add one partition to create the consumer lag metric
 fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
-fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+fetcher.addPartitions(Map(partition -> 
initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0)))

Review comment:
   nit: If we keep `topicIds` as a Scala map, we would get an Option 
directly with `topicIds.get(...)`. Also, a space is missing after the `,`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692169643



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {

Review comment:
   Intuitively, I would have thoughts that we don't need this. The broker 
is already a follower in this case so the leader should be alive. What's your 
take on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692168365



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()

Review comment:
   nit: We could omit specifying the type here and use 
`mutable.Set.empty[Partition]`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692166774



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?

Review comment:
   I don't think so. Those logs, which are exactly the same that we have in 
`makeFollowers`, would be really confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692165645



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {

Review comment:
   Is `topicIds` necessary? I suppose that we could get the `topicId` from 
the `Partition`, isn't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692164654



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1544,7 +1554,7 @@ class ReplicaManager(val config: KafkaConfig,
   // replica from source dir to destination dir
   logManager.abortAndPauseCleaning(topicPartition)
 
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(leader,
+  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic()), leader,

Review comment:
   nit: parenthesis not required after `topic` ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692164057



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1420,6 +1423,10 @@ class ReplicaManager(val config: KafkaConfig,
   }
   val partitionsToBeFollower = partitionStates.filter { case (k, _) => 
!partitionsToBeLeader.contains(k) }
 
+  val partitionsToUpdateIdForFollower = topicIdUpdatePartitions.filter 
{ case (_, partitionState) =>
+partitionState.leader != localBrokerId
+  }

Review comment:
   Could we avoid having to re-iterate over `topicIdUpdatePartitions` by 
applying the condition before adding the partition to `topicIdUpdatePartitions`?
   
   I have opened a PR to do the same for the two collections above: 
https://github.com/apache/kafka/pull/11225




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692162163



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   Right. Reusing `makeFollowers` would not work out of the box.
   
   I am not sure to follow your point regarding the possibility of overriding a 
pending isr state. I thought that only the leader updates the isr state and the 
followers only updates it via `updateAssignmentAndIsr` based on the controller 
state. I might be missing something.
   
   I am not against having `updateTopicIdForFollowers` but I wanted to ensure 
that we have thought about all the options.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-18 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r690993461



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   Have we considered relying on `makeFollowers` instead of introducing 
`updateTopicIdForFollowers`? The two methods are really similar. The main 
notable difference is that `makeFollowers` shutdowns the fetcher thread. We 
could perhaps optimize this part to not do it if the broker is already a 
follower for the partition.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -854,6 +859,10 @@ case class PartitionFetchState(fetchOffset: Long,
   s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
   s")"
   }
+
+  def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = {
+PartitionFetchState(topicId, fetchOffset, lag, currentLeaderEpoch, delay, 
state, lastFetchedEpoch)

Review comment:
   nit: We could use `this.copy(topicId = topicId`).

##
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##
@@ -262,7 +263,8 @@ class ReplicaAlterLogDirsThread(name: String,
   private def buildFetchForPartition(tp: TopicPartition, fetchState: 
PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
 val requestMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
 val partitionsWithError = mutable.Set[TopicPartition]()
-val topicIds = replicaMgr.metadataCache.topicNamesToIds()
+val topicId = fetchState.topicId
+val topicIds = Collections.singletonMap(tp.topic(), 
topicId.getOrElse(Uuid.ZERO_UUID))

Review comment:
   nit: I would bring this one closer to `requestBuilder` as it is only 
used by the request builder or we could even inline it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-18 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r690993461



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   Have we considered relying on `makeFollowers` instead of introducing 
`updateTopicIdForFollowers`? The two methods are really similar. The main 
notable difference is that `makeFollowers` shutdowns the fetcher thread. We 
could perhaps optimize this part to not do it if the broker is already a 
follower for the partition.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -854,6 +859,10 @@ case class PartitionFetchState(fetchOffset: Long,
   s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
   s")"
   }
+
+  def updateTopicId(topicId: Option[Uuid]): PartitionFetchState = {
+PartitionFetchState(topicId, fetchOffset, lag, currentLeaderEpoch, delay, 
state, lastFetchedEpoch)

Review comment:
   nit: We could use `this.copy(topicId = topicId`).

##
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##
@@ -262,7 +263,8 @@ class ReplicaAlterLogDirsThread(name: String,
   private def buildFetchForPartition(tp: TopicPartition, fetchState: 
PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
 val requestMap = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
 val partitionsWithError = mutable.Set[TopicPartition]()
-val topicIds = replicaMgr.metadataCache.topicNamesToIds()
+val topicId = fetchState.topicId
+val topicIds = Collections.singletonMap(tp.topic(), 
topicId.getOrElse(Uuid.ZERO_UUID))

Review comment:
   nit: I would bring this one closer to `requestBuilder` as it is only 
used by the request builder or we could even inline it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org