jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572367849



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep 
IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else 
Errors.UNKNOWN_TOPIC_ID
+      val unresolvedIterator = unresolvedPartitions.iterator()
+      while (unresolvedIterator.hasNext()) {
+        val partition = unresolvedIterator.next()
+
+        // Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+        val forgetPartitions = 
toForgetAndIds.toForgetIds.get(partition.topicId)
+        if (forgetPartitions != null && 
forgetPartitions.contains(partition.partition))
+          unresolvedIterator.remove()
+
+        // Try to resolve ID, if there is a name for the given ID, place a 
CachedPartition in partitionMap
+        // and remove from unresolvedPartitions.
+        else if (topicNames.get(partition.topicId) != null) {
+          val newTp = new TopicPartition(topicNames.get(partition.topicId), 
partition.partition)
+          val newCp = new CachedPartition(newTp, partition.topicId, 
partition.reqData)
+          partitionMap.add(newCp)
+          added.add(newTp)
+          unresolvedIterator.remove()
+        } else {
+          val idError = fetchDataAndError.idErrors.get(partition.topicId)

Review comment:
       This is for adding unresolved partitions in the session but not in the 
request. I can add comments to clarify what is happening.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to