Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should 
never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log 
(and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: 
Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group 
%s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, 
topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, 
PartitionData] = {
+    trace("Getting offsets of %s for group 
%s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
         val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { 
topicIdPartition =>
             val partitionData = new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = 
topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new 
PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): 
PartitionData = {
+            if (requireStable && 
group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, 
Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, 
topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with 
you that using the zero id does not feel right. It can be considered as an 
abusive use of the `TopicIdPartition` as an invalid reference to a resource in 
the cluster. In the worst case, an invalid `TopicIdPartition` could end up 
being used somewhere else which treats all `TopicIdPartition` as if they were 
valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of 
the `TopicIdPartition` are invalid (that is, references the zero id). This 
defeats the purpose of returning a reference based on topic-id-partition. So, I 
agree with you that using topic-id-partition should not be exposed in the value 
returned by `getOffsets` even though there is an asymmetry in the method 
prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to