dajac commented on code in PR #17444:
URL: https://github.com/apache/kafka/pull/17444#discussion_r1810210022


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -72,7 +71,8 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
-        return Collections.emptySet();
+        TopicMetadata topic = this.topicMetadata.get(topicId);
+        return topic == null ? Set.of() : 
topic.partitionRacks().getOrDefault(partition, Set.of());

Review Comment:
   I suppose that this won't work because the `topicMetadata` is not maintained 
in the group any more. Hence you not guaranteed to have it when the assignor is 
called. I was hoping that we could reuse the MetadataImage.



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupMetadataValue.json:
##########
@@ -21,6 +21,8 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "Epoch", "versions": "0+", "type": "int32",
-      "about": "The group epoch." }
+      "about": "The group epoch." },
+    { "name":  "SubscriptionMetadataHash", "versions":  "0+", "type":  "int64",
+      "about":  "The hash of subscription metadata." }

Review Comment:
   Unfortunately, we cannot add a new field like this because it breaks the 
backward compatibility. Instead, we should use a tagged field.



##########
clients/src/main/java/org/apache/kafka/common/Uuid.java:
##########
@@ -31,7 +31,7 @@
  * This is the same type of UUID as the ones generated by java.util.UUID. The 
toString() method prints
  * using the base64 string encoding. Likewise, the fromString method expects a 
base64 string encoding.
  */
-public class Uuid implements Comparable<Uuid> {
+public class Uuid implements Comparable<Uuid>, java.io.Serializable {

Review Comment:
   I don't feel comfortable with this change. `Uuid` is part of our public API 
so we would have to keep it forever.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -235,4 +239,8 @@ public static ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion
             return apiMessageAndVersion.message();
         }
     }
+
+    public static long hashSubscriptionMetadata(Map<String, TopicMetadata> 
subscriptionMetadata) {
+        return Murmur3.hash64(new 
TreeMap<>(subscriptionMetadata).toString().getBytes(StandardCharsets.UTF_8));

Review Comment:
   This seems pretty inefficient. I wonder if we could compute the hash 
incrementally while we iterate on the (sorted) topic names. Then, for each 
topic we could add the topic id, the number of partitions and the (sorted) 
racks of each partitions. It would avoid building the TreeMap, avoid converting 
it to a String to finally transforms it into bytes. Have you considered it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to