jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r567189128
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -237,14 +317,80 @@ class FetchSession(val id: Int, type TL = util.ArrayList[TopicPartition] // Update the cached partition data based on the request. - def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], - reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized { + def update(version: Short, + fetchDataAndError: FetchDataAndError, + toForgetAndIds: ToForgetAndIds, + reqMetadata: JFetchMetadata, + topicIds: util.Map[String, Uuid], + topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - fetchData.forEach { (topicPart, reqData) => - val newCachedPart = new CachedPartition(topicPart, reqData) + + // Only make changes to topic IDs if we have a new request version. + // If we receive an old request version, ignore all topic ID code, keep IDs that are there. + if (version >= 13) { + val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID + val unresolvedIterator = unresolvedPartitions.iterator() + while (unresolvedIterator.hasNext()) { + val partition = unresolvedIterator.next() + + // Remove from unresolvedPartitions if ID is unresolved in toForgetIds + val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId) + if (forgetPartitions != null && forgetPartitions.contains(partition.partition)) + unresolvedIterator.remove() + + // Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap + // and remove from unresolvedPartitions. + else if (topicNames.get(partition.topicId) != null) { + val newTp = new TopicPartition(topicNames.get(partition.topicId), partition.partition) + val newCp = new CachedPartition(newTp, partition.topicId, partition.reqData) + partitionMap.add(newCp) + added.add(newTp) + unresolvedIterator.remove() + } else { + val idError = fetchDataAndError.idErrors.get(partition.topicId) + if (idError == null) { + fetchDataAndError.idErrors.put(partition.topicId, new FetchResponse.IdError(partition.topicId, Collections.singletonList(partition.partition), error)) + } else { + idError.addPartitions(Collections.singletonList(partition.partition)) + } + } + } + + // We will also want to check topic ID here to see if the request matches what we have "on file". + // 1. If the current ID in a cached partition is Uuid.ZERO_UUID, and we have a valid + // ID in topic IDs, simply add the ID. If there is not a valid ID, keep as Uuid.ZERO_UUID. + // 2. If we have an situation where there is a valid ID on the partition, but it does not match + // the ID in topic IDs (likely due to topic deletion and recreation) or there is no valid topic + // ID on the broker (topic deleted or broker received a metadataResponse without IDs), + // remove the cached partition from partitionMap. + /**val partitionIterator = partitionMap.iterator() Review comment: I should also move that comment (and maybe simplify it) to the PartitionIterator where I moved the code for removing partitions with stale ids. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org