showuon commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1112773897


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   Why can't we use `topicsById.values()` as before?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -38,15 +40,21 @@
  */
 public final class TopicsImage {
     public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+        new TopicsImage(map(), map());
+
+    final ImMap<Uuid, TopicImage> topicsById;
+    final ImMap<String, TopicImage> topicsByName;
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    public TopicsImage(ImMap<Uuid, TopicImage> topicsById,
+                       ImMap<String, TopicImage> topicsByName) {
+        this.topicsById = topicsById;
+        this.topicsByName = topicsByName;
+    }
 
-    public TopicsImage(Map<Uuid, TopicImage> topicsById,
-                       Map<String, TopicImage> topicsByName) {
-        this.topicsById = Collections.unmodifiableMap(topicsById);
-        this.topicsByName = Collections.unmodifiableMap(topicsByName);
+    public TopicsImage including(TopicImage topic) {

Review Comment:
   Looks like this is only used in test. Could this be in `protected` scope?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##########
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion 
newVersion) {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new 
HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new 
HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImMap<Uuid, TopicImage> newTopicsById = image.topicsById;
+        ImMap<String, TopicImage> newTopicsByName = image.topicsByName;
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = 
image.topicsById.get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.without(topicId);
+                newTopicsByName = 
newTopicsByName.without(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.assoc(topicId, 
newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.assoc(topicName, 
newTopicToBeAddedOrUpdated);

Review Comment:
   From the 
[javadoc](https://javadoc.io/doc/org.organicdesign/Paguro/latest/org/organicdesign/fp/collections/ImMap.html#assoc(K,V)),
 I'm not sure if the `assoc` method will do update with the same key, not 
adding a new one. Could you confirm that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to