jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r715743267
##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -238,47 +247,40 @@ class FetchSession(val id: Int,
def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
- def getFetchOffset(topicPartition: TopicPartition): Option[Long] =
synchronized {
- Option(partitionMap.find(new CachedPartition(topicPartition,
- sessionTopicIds.getOrDefault(topicPartition.topic(),
Uuid.ZERO_UUID)))).map(_.fetchOffset)
+ def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] =
synchronized {
+ Option(partitionMap.find(new
CachedPartition(topicIdPartition.topicPartition,
topicIdPartition.topicId))).map(_.fetchOffset)
}
- type TL = util.ArrayList[TopicPartition]
+ type TL = util.ArrayList[TopicIdPartition]
// Update the cached partition data based on the request.
def update(fetchData: FetchSession.REQ_MAP,
- toForget: util.List[TopicPartition],
+ toForget: util.List[TopicIdPartition],
reqMetadata: JFetchMetadata,
- topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) =
synchronized {
+ usesTopicIds: Boolean): (TL, TL, TL) = synchronized {
val added = new TL
val updated = new TL
val removed = new TL
- val inconsistentTopicIds = new TL
fetchData.forEach { (topicPart, reqData) =>
- // Get the topic ID on the broker, if it is valid and the topic is new
to the session, add its ID.
- // If the topic already existed, check that its ID is consistent.
- val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
- val newCachedPart = new CachedPartition(topicPart, id, reqData)
- if (id != Uuid.ZERO_UUID) {
- val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic,
id)
- if (prevSessionTopicId != null && prevSessionTopicId != id)
- inconsistentTopicIds.add(topicPart)
- }
+ val newCachedPart = new CachedPartition(topicPart.topicPartition,
topicPart.topicId, reqData)
val cachedPart = partitionMap.find(newCachedPart)
if (cachedPart == null) {
partitionMap.mustAdd(newCachedPart)
added.add(topicPart)
} else {
cachedPart.updateRequestParams(reqData)
+ if (cachedPart.topic == null)
+ // Update the topic name in place
+ cachedPart.resolveUnknownName(topicPart.topicPartition.topic)
Review comment:
I'm not sure I follow here. We have an unresolved partition in the
session and we are updating it.
Why would we not resolve the partition? I suppose it will get picked up by
the forEach partition resolving process, but not sure how the earlier comment
applies here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]