[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1206991344 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: did we decide that topicIDPartition when the request is old would just have a 0 id? Could you give a brief outline for old vs new request versions and how they are handled (ie representation in memory when handling + what we return in the response for happy path and error cases) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1170250900 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int, * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either * returns the current offset or it begins to sync the cache from the log (and returns an error code). */ - def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { -trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = { +trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId)) val group = groupMetadataCache.get(groupId) if (group == null) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { group.inLock { if (group.is(Dead)) { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) -topicPartition -> partitionData +topicIdPartition -> partitionData }.toMap } else { - val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) - - topicPartitions.map { topicPartition => -if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { - topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, + def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = { +if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) { + new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) } else { - val partitionData = group.offset(topicPartition) match { + group.offset(topicIdPartition) match { case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) } - topicPartition -> partitionData } - }.toMap + } + + topicIdPartitionsOpt match { +case Some(topicIdPartitions) => + topicIdPartitions.map { topicIdPartition => +topicIdPartition -> resolvePartitionData(topicIdPartition) + }.toMap + +case None => + val topicIds = replicaManager.metadataCache.topicNamesToIds() + group.allOffsets.keySet.map { topicPartition => +Option(topicIds.get(topicPartition.topic())) match { + case Some(topicId) => +val topicIdPartition = new TopicIdPartition(topicId, topicPartition) +topicIdPartition -> resolvePartitionData(topicIdPartition) + case None => +val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition) +zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION +} + }.toMap Review Comment: Were we expecting to use this code path when the IBP is less than 2.8? I guess I assumed that the IBP would be higher. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1163210151 ## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ## @@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState expiredOffsets } - def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => -(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) + def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => +(new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata) Review Comment: No worries :) I think for other protocols we return with UNKNOWN_TOPIC_ID in KafkaApis when doing the conversion, which would typically be retriable. But would have to look into the details here. Might not be as simple since KafkaApis is a place where we can return with an error easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1158947529 ## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ## @@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState expiredOffsets } - def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => -(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) + def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => +(new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata) Review Comment: What cases are we considering? A topic should have an ID as long as controller is ibp 2.8+ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1158817715 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter( topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics], requireStable: Boolean ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = { -val topicPartitions = new mutable.ArrayBuffer[TopicPartition]() +val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]() topics.forEach { topic => topic.partitionIndexes.forEach { partition => -topicPartitions += new TopicPartition(topic.name, partition) +topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name) Review Comment: I assume this will be replaced with topic.topicId? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1158816855 ## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ## @@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState expiredOffsets } - def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => -(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata) + def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => +(new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata) Review Comment: Where will the topic ID come from here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
jolshan commented on code in PR #13493: URL: https://github.com/apache/kafka/pull/13493#discussion_r1158813853 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ## @@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter( topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics], requireStable: Boolean ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = { -val topicPartitions = new mutable.ArrayBuffer[TopicPartition]() +val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]() topics.forEach { topic => topic.partitionIndexes.forEach { partition => -topicPartitions += new TopicPartition(topic.name, partition) +topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name) Review Comment: Is this an intermediate step before we add the actual topic IDs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org