junrao commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r573102028
########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param time The clock to use. * @param cache The fetch session cache. * @param reqMetadata The request metadata. - * @param fetchData The partition data from the fetch request. + * @param fetchDataAndError The partition data and topic ID errors from the fetch request. + * @param topicIds The map from topic name to topic IDs * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchDataAndError: FetchDataAndError, + private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { + val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values()) override def getFetchOffset(part: TopicPartition): Option[Long] = - Option(fetchData.get(part)).map(_.fetchOffset) + Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset) override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach(fun(_, _)) + fetchDataAndError.fetchData.forEach(fun(_, _)) Review comment: Yes, we want to return UNKNOWN_TOPIC_ID for all partitions in this case. The unresolved partitions in this PR could also return the unsupported error code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org