dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r715684131
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -672,29 +672,22 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId val fetchRequest = request.body[FetchRequest] - val (topicIds, topicNames) = + val topicNames = if (fetchRequest.version() >= 13) - metadataCache.topicIdInfo() + metadataCache.topicIdsToNames() else - (Collections.emptyMap[String, Uuid](), Collections.emptyMap[Uuid, String]()) + Collections.emptyMap[Uuid, String]() - // If fetchData or forgottenTopics contain an unknown topic ID, return a top level error. - var fetchData: util.Map[TopicPartition, FetchRequest.PartitionData] = null - var forgottenTopics: util.List[TopicPartition] = null - try { - fetchData = fetchRequest.fetchData(topicNames) - forgottenTopics = fetchRequest.forgottenTopics(topicNames) - } catch { - case e: UnknownTopicIdException => throw e - } + val fetchData = fetchRequest.fetchData(topicNames) + val forgottenTopics = fetchRequest.forgottenTopics(topicNames) Review comment: When a session is used, resolving the topic ids is not really necessary here because we should already have the names in the session or we would resolve them later anyway. I wonder if it would be better to do this entirely in the `fetchManager.newConext` based on the context type. Have you considered something like this? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() val sessionTopicIds = mutable.Map[String, Uuid]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + sessionTopicIds.put(topicIdPartition.topicPartition.topic, topicIdPartition.topicId) Review comment: Do we still need this `sessionTopicIds` mapping if we have the topic id in the `topicIdPartition`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() val sessionTopicIds = mutable.Map[String, Uuid]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + sessionTopicIds.put(topicIdPartition.topicPartition.topic, topicIdPartition.topicId) + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, Errors.UNKNOWN_TOPIC_ID) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - interesting += (topicPartition -> data) + interesting += (topicIdPartition -> data) } } else { - fetchContext.foreachPartition { (part, topicId, _) => - sessionTopicIds.put(part.topic(), topicId) - erroneous += part -> FetchResponse.partitionResponse(part.partition, Errors.TOPIC_AUTHORIZATION_FAILED) + fetchContext.foreachPartition { (topicIdPartition, _) => + sessionTopicIds.put(topicIdPartition.topicPartition.topic, topicIdPartition.topicId) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED) } } } else { // Regular Kafka consumers need READ permission on each partition they are fetching. - val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)] - fetchContext.foreachPartition { (topicPartition, topicId, partitionData) => - partitionDatas += topicPartition -> partitionData - sessionTopicIds.put(topicPartition.topic(), topicId) - } - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic) - partitionDatas.foreach { case (topicPartition, data) => - if (!authorizedTopics.contains(topicPartition.topic)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + val partitionDatas = new mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)] + fetchContext.foreachPartition { (topicIdPartition, partitionData) => + partitionDatas += topicIdPartition -> partitionData + sessionTopicIds.put(topicIdPartition.topicPartition.topic(), topicIdPartition.topicId) Review comment: Could we direclty check if the topic name is null here and put the unresolved ones to `erroneous`? This would avoid the filter on the next line. ########## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ########## @@ -233,7 +233,8 @@ class ReplicaFetcherThread(name: String, Map.empty } } else { - fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala + fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala.map { Review comment: Not related to this line. Don't wee need to update the fetcher to handle the topic id errors at the partition level? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -163,7 +173,8 @@ class CachedPartition(val topic: String, mustRespond } - override def hashCode: Int = Objects.hash(new TopicPartition(topic, partition), topicId) + override def hashCode: Int = if (topicId != Uuid.ZERO_UUID) (31 * partition) + topicId.hashCode else Review comment: Could we add a scaladoc for this method which explains what we do and why? ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -378,53 +378,47 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque * @param reqMetadata The request metadata. * @param fetchData The partition data from the fetch request. * @param usesTopicIds True if this session should use topic IDs. - * @param topicIds The map from topic names to topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, private val cache: FetchSessionCache, private val reqMetadata: JFetchMetadata, - private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData], + private val fetchData: util.Map[TopicIdPartition, FetchRequest.PartitionData], private val usesTopicIds: Boolean, - private val topicIds: util.Map[String, Uuid], private val isFromFollower: Boolean) extends FetchContext { - override def getFetchOffset(part: TopicPartition): Option[Long] = + override def getFetchOffset(part: TopicIdPartition): Option[Long] = Option(fetchData.get(part)).map(_.fetchOffset) - override def foreachPartition(fun: (TopicPartition, Uuid, FetchRequest.PartitionData) => Unit): Unit = { - fetchData.forEach((tp, data) => fun(tp, topicIds.get(tp.topic), data)) + override def foreachPartition(fun: (TopicIdPartition, FetchRequest.PartitionData) => Unit): Unit = { + fetchData.forEach((tp, data) => fun(tp, data)) } override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = { - FetchResponse.sizeOf(versionId, updates.entrySet.iterator, topicIds) + FetchResponse.sizeOf(versionId, updates.entrySet.iterator) } override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse = { var hasInconsistentTopicIds = false - def createNewSession: (FetchSession.CACHE_MAP, FetchSession.TOPIC_ID_MAP) = { + def createNewSession: FetchSession.CACHE_MAP = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) - val sessionTopicIds = new util.HashMap[String, Uuid](updates.size) updates.forEach { (part, respData) => if (respData.errorCode() == Errors.INCONSISTENT_TOPIC_ID.code()) { info(s"Session encountered an inconsistent topic ID for topicPartition $part.") hasInconsistentTopicIds = true } val reqData = fetchData.get(part) - val id = topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID) - cachedPartitions.mustAdd(new CachedPartition(part, id, reqData, respData)) - if (id != Uuid.ZERO_UUID) - sessionTopicIds.put(part.topic, id) + cachedPartitions.mustAdd(new CachedPartition(part.topicPartition, part.topicId, reqData, respData)) } - (cachedPartitions, sessionTopicIds) + cachedPartitions } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, updates.size, usesTopicIds, () => createNewSession) if (hasInconsistentTopicIds) { - FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new FetchSession.RESP_MAP, Collections.emptyMap()) + FetchResponse.of(Errors.INCONSISTENT_TOPIC_ID, 0, responseSessionId, new FetchSession.RESP_MAP) Review comment: Do we still need to return `INCONSISTENT_TOPIC_ID` a top level error? Fetcher prior to this change would need it, for sure. With this PR, we actually don't want the fetcher to treat it as a top level error but rather as a partition error. We need to think/discuss this a little more, I think. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -238,47 +247,40 @@ class FetchSession(val id: Int, def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) } - def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized { - Option(partitionMap.find(new CachedPartition(topicPartition, - sessionTopicIds.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID)))).map(_.fetchOffset) + def getFetchOffset(topicIdPartition: TopicIdPartition): Option[Long] = synchronized { + Option(partitionMap.find(new CachedPartition(topicIdPartition.topicPartition, topicIdPartition.topicId))).map(_.fetchOffset) } - type TL = util.ArrayList[TopicPartition] + type TL = util.ArrayList[TopicIdPartition] // Update the cached partition data based on the request. def update(fetchData: FetchSession.REQ_MAP, - toForget: util.List[TopicPartition], + toForget: util.List[TopicIdPartition], reqMetadata: JFetchMetadata, - topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = synchronized { + usesTopicIds: Boolean): (TL, TL, TL) = synchronized { val added = new TL val updated = new TL val removed = new TL - val inconsistentTopicIds = new TL fetchData.forEach { (topicPart, reqData) => - // Get the topic ID on the broker, if it is valid and the topic is new to the session, add its ID. - // If the topic already existed, check that its ID is consistent. - val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID) - val newCachedPart = new CachedPartition(topicPart, id, reqData) - if (id != Uuid.ZERO_UUID) { - val prevSessionTopicId = sessionTopicIds.putIfAbsent(topicPart.topic, id) - if (prevSessionTopicId != null && prevSessionTopicId != id) - inconsistentTopicIds.add(topicPart) - } + val newCachedPart = new CachedPartition(topicPart.topicPartition, topicPart.topicId, reqData) val cachedPart = partitionMap.find(newCachedPart) if (cachedPart == null) { partitionMap.mustAdd(newCachedPart) added.add(topicPart) } else { cachedPart.updateRequestParams(reqData) + if (cachedPart.topic == null) + // Update the topic name in place + cachedPart.resolveUnknownName(topicPart.topicPartition.topic) Review comment: This might not be necessary if we won't resolve topic ids in the request in all cases (see my previous comment). ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,45 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() val sessionTopicIds = mutable.Map[String, Uuid]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + sessionTopicIds.put(topicIdPartition.topicPartition.topic, topicIdPartition.topicId) + if (topicIdPartition.topicPartition.topic == null ) + erroneous += topicIdPartition -> FetchResponse.partitionResponse(topicIdPartition.topicPartition.partition, Errors.UNKNOWN_TOPIC_ID) Review comment: nit: Could we add an overload to `partitionResponse` which takes a `TopicIdPartition`? This would reduce the boiler plate code a bit here. ########## File path: core/src/main/scala/kafka/server/FetchSession.scala ########## @@ -173,8 +184,8 @@ class CachedPartition(val topic: String, this.eq(that) || (that.canEqual(this) && this.partition.equals(that.partition) && - this.topic.equals(that.topic) && - this.topicId.equals(that.topicId)) + (if (this.topicId != Uuid.ZERO_UUID) this.topicId.equals(that.topicId) + else this.topic.equals(that.topic))) Review comment: Side note here: I think that we should implement `override def elementKeysAreEqual(that: Any): Boolean` from the `ImplicitLinkedHashCollection.Element` interface to make it clear that we do this for comparing elements in the collections. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -870,12 +868,14 @@ class KafkaApis(val requestChannel: RequestChannel, // Prepare fetch response from converted data val response = - FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData, sessionTopicIds.asJava) + FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs, unconvertedFetchResponse.sessionId, convertedData) // record the bytes out metrics only when the response is being sent response.data().responses().forEach { topicResponse => topicResponse.partitions().forEach { data => - val tp = new TopicPartition(topicResponse.topic(), data.partitionIndex()) - brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data)) + val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic(), data.partitionIndex())) + // If the topic name was not known, we will have no bytes out. + if (tp.topicPartition.topic != null) Review comment: Should we create `tp` after this check? We could also create a `TopicPartition` as we don't really use `TopicIdPartition` for the metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org