jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r718910814



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -173,8 +184,8 @@ class CachedPartition(val topic: String,
         this.eq(that) ||
           (that.canEqual(this) &&
             this.partition.equals(that.partition) &&
-            this.topic.equals(that.topic) &&
-            this.topicId.equals(that.topicId))
+            (if (this.topicId != Uuid.ZERO_UUID) 
this.topicId.equals(that.topicId)
+            else this.topic.equals(that.topic)))

Review comment:
       It seems like right now `elementKeysAreEqual` is just `equals`. Is the 
idea in implementing this to prevent someone else from doing so and not using 
`equals`/the logic from equals?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -173,8 +184,8 @@ class CachedPartition(val topic: String,
         this.eq(that) ||
           (that.canEqual(this) &&
             this.partition.equals(that.partition) &&
-            this.topic.equals(that.topic) &&
-            this.topicId.equals(that.topicId))
+            (if (this.topicId != Uuid.ZERO_UUID) 
this.topicId.equals(that.topicId)
+            else this.topic.equals(that.topic)))

Review comment:
       Or is it that the javadoc says things like `key.elementKeysAreEqual(e) 
and key.hashCode() == e.hashCode()` so we should be using elementKeysAreEqual 
in FetchSession?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -378,53 +378,47 @@ class SessionlessFetchContext(val fetchData: 
util.Map[TopicPartition, FetchReque
   * @param reqMetadata        The request metadata.
   * @param fetchData          The partition data from the fetch request.
   * @param usesTopicIds       True if this session should use topic IDs.
-  * @param topicIds           The map from topic names to topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
-                       private val fetchData: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+                       private val fetchData: util.Map[TopicIdPartition, 
FetchRequest.PartitionData],
                        private val usesTopicIds: Boolean,
-                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends 
FetchContext {
-  override def getFetchOffset(part: TopicPartition): Option[Long] =
+  override def getFetchOffset(part: TopicIdPartition): Option[Long] =
     Option(fetchData.get(part)).map(_.fetchOffset)
 
-  override def foreachPartition(fun: (TopicPartition, Uuid, 
FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data))
+  override def foreachPartition(fun: (TopicIdPartition, 
FetchRequest.PartitionData) => Unit): Unit = {
+    fetchData.forEach((tp, data) => fun(tp, data))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: 
Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse = {
     var hasInconsistentTopicIds = false
-    def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) 
= {
+    def createNewSession: FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
-      val sessionTopicIds = new util.HashMap[String, Uuid](updates.size)
       updates.forEach { (part, respData) =>
         if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) {
           info(s"Session encountered an inconsistent topic ID for 
topicPartition $part.")
           hasInconsistentTopicIds = true
         }
         val reqData = fetchData.get(part)
-        val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)
-        cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, 
respData))
-        if (id != Uuid.ZERO_UUID)
-          sessionTopicIds.put(part.topic, id)
+        cachedPartitions.mustAdd(new CachedPartition(part.topicPartition, 
part.topicId, reqData, respData))
       }
-      (cachedPartitions, sessionTopicIds)
+      cachedPartitions
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), 
isFromFollower,
         updates.size, usesTopicIds, () => createNewSession)
     if (hasInconsistentTopicIds) {
-      FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new 
FetchSession.RESP_MAP, Collections.emptyMap())
+      FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new 
FetchSession.RESP_MAP)

Review comment:
       ^ this is still something we need to resolve.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to