[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
dajac commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206732981 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: I don't recall the details now. I have to get back to it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
dajac commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1170980408 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: It could if the IBP is kept on an old version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
dajac commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1169911272 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: I have been thinking about this part of the code and I don't think that we can treat it like this. The first thing to note is that we hit this path when all partitions are requested. In this case, it feels weird to return a partition with a zero topic id and `OffsetFetchResponse.UNKNOWN_PARTITION`. I think that we should just ignore unknown partitions in this case. Note that partitions are deleted from the cache when the topic is deleted. The second thing to note is that we can't ignore partitions with unknown topic ids all the time because the broker may be on an IBP which does not support topic ids. I think that we have to reason about it based on the version o