jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r719668223
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -378,53 +378,47 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param reqMetadata The request metadata. * @param fetchData The partition data from the fetch request. * @param usesTopicIds True if this session should use topic IDs. - * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchData: util.Map[TopicIdPartition, FetchRequest.PartitionData], private val usesTopicIds: Boolean, - private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { - override def getFetchOffset(part: TopicPartition): Option[Long] = + override def getFetchOffset(part: TopicIdPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) + override def foreachPartition(fun: (TopicIdPartition, FetchRequest.PartitionData) => Unit): Unit = { + fetchData.forEach((tp, data) => fun(tp, data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) + FetchResponse.sizeOf(versionId, updates.entrySet.iterator) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { var hasInconsistentTopicIds = false - def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { + def createNewSession: FetchSession.CACHE_MAP = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) - val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { info(s"Session encountered an inconsistent topic ID for topicPartition $part.") hasInconsistentTopicIds = true } val reqData = fetchData.get(part) - val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) - cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) - if (id != Uuid.ZERO_UUID) - sessionTopicIds.put(part.topic, id) + cachedPartitions.mustAdd(new CachedPartition(part.topicPartition, part.topicId, reqData, respData)) } - (cachedPartitions, sessionTopicIds) + cachedPartitions } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, updates.size, usesTopicIds, () => createNewSession) if (hasInconsistentTopicIds) { - FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new FetchSession.RESP_MAP, Collections.emptyMap()) + FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new FetchSession.RESP_MAP) Review comment: For the replica fetcher, we could choose not delay partitions with this error. Seems like in the fetcher, we just choose whether to update metadata. So maybe this won't be too difficult. Alternatively, we change the fetching flow to contain topic ID earlier in the process and so we can include in the error response as well. That would be a lot of work. Still need to think through the current setup to make sure we aren't losing critical data in this state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org