hachikuji commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r670046323
########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); + // If both topic IDs were valid and the topic ID changed, update the metadata Review comment: nit: move this comment into the `if` ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); + // If both topic IDs were valid and the topic ID changed, update the metadata + } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) { Review comment: Hmm, shouldn't this check come before the epoch check? Admittedly, it's unlikely that a recreated topic would have a higher epoch, but we may as well handle that case. By the way, it's a little inconsistent that this check uses both null and `Uuid.ZERO_UUID` to represent a missing value. Maybe we can use null consistently? ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -216,6 +217,14 @@ public synchronized boolean updateRequested() { } } + public synchronized Uuid topicId(String topicName) { Review comment: Can you document that this returns null if the topicId does not exist or is not known? Similarly for `topicName`. ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); Review comment: nit: `assertNull` ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); + assertTrue(cluster.topicIds().isEmpty()); + } + + @Test + public void testMetadataMergeOnIdDowngrade() { + Time time = new MockTime(); + Map<String, Uuid> topicIds = new HashMap<>(); + + final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) { + @Override + protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return retainTopics.get().contains(topic); + } + }; + + // Initialize a metadata instance with two topics. Both will be retained. + String clusterId = "clusterId"; + int nodes = 2; + Map<String, Integer> topicPartitionCounts = new HashMap<>(); + topicPartitionCounts.put("validTopic1", 2); + topicPartitionCounts.put("validTopic2", 3); + + retainTopics.set(new HashSet<>(Arrays.asList( Review comment: nit: you can use `Utils.mkSet` (a few more of these) ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); + // If both topic IDs were valid and the topic ID changed, update the metadata + } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) { + log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " + + "Using the newly updated metadata.", tp, oldTopicId, topicId); Review comment: Instead of "Using the newly updated metadata," maybe we can say this: > Resetting the last seen epoch to {}. ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -371,6 +372,43 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } + @Test + public void testEpochUpdateOnChangedTopicIds() { + TopicPartition tp = new TopicPartition("topic-1", 0); + Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + + MetadataResponse metadataResponse = emptyMetadataResponse(); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + + // Start with a topic with no topic ID + metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases. + // Don't update to an older one + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); Review comment: nit: this seems more concise ```java assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); ``` ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -371,6 +372,43 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } + @Test + public void testEpochUpdateOnChangedTopicIds() { + TopicPartition tp = new TopicPartition("topic-1", 0); + Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + + MetadataResponse metadataResponse = emptyMetadataResponse(); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + + // Start with a topic with no topic ID + metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases. + // Don't update to an older one + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // Don't cause update if it's the same one + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10)); + + // Update if we see newer epoch + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L); + assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); + + // We should also update if we see a new topicId even if the epoch is lower Review comment: We may as well cover the case when the topicId is changed _and_ the epoch is higher. ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId, Set<String> addInvalidTopics, Set<String> addInternalTopics, Node newController, + Map<String, Uuid> topicIds, BiPredicate<String, Boolean> retainTopic) { Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic)); Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size()); + Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size()); + + // We want the most recent topic ID. We start with the previous ID stored for retained topics and then + // update with newest information in the MetadataResponse. Review comment: nit: "update with the newest information from the MetadataResponse." ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -863,8 +906,12 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { "keepValidTopic", "newValidTopic"))); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200); + topicIds.put("newValidTopic", Uuid.randomUuid()); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.remove("oldValidTopic"); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + assertEquals(metadata.topicId("oldValidTopic"), null); Review comment: nit: use `assertNull` ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -316,20 +325,29 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, Set<String> invalidTopics = new HashSet<>(); List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>(); + Map<String, Uuid> topicIds = new HashMap<>(); for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { - topics.add(metadata.topic()); + String topicName = metadata.topic(); + Uuid topicId = metadata.topicId(); + topics.add(topicName); + // We only update if the current metadata since we can only compare when both topics have valid IDs + Uuid oldTopicId = null; + if (!topicId.equals(Uuid.ZERO_UUID)) { Review comment: nit: I think we're guaranteed that `topicId` is not null (in spite of the inconsistent `equals`), but it's still a little nicer to write this check as `!Uuid.ZERO_UUID.equals(topicId)` ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -69,13 +73,20 @@ private MetadataCache(String clusterId, Set<String> invalidTopics, Set<String> internalTopics, Node controller, + Map<String, Uuid> topicIds, Cluster clusterInstance) { this.clusterId = clusterId; this.nodes = nodes; this.unauthorizedTopics = unauthorizedTopics; this.invalidTopics = invalidTopics; this.internalTopics = internalTopics; this.controller = controller; + this.topicIds = topicIds; + + this.topicNames = new HashMap<>(topicIds.size()); Review comment: As far as I can tell, there are no uses of this collection (`Metadata.topicName` is not referenced). Can we get rid of it? ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -316,20 +325,29 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, Set<String> invalidTopics = new HashSet<>(); List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>(); + Map<String, Uuid> topicIds = new HashMap<>(); for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { - topics.add(metadata.topic()); + String topicName = metadata.topic(); + Uuid topicId = metadata.topicId(); + topics.add(topicName); + // We only update if the current metadata since we can only compare when both topics have valid IDs Review comment: nit: rephrase? ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic"))); assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); Review comment: Do we want to make this assertion stronger? Or is `topicIds` a subset? ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId, Set<String> addInvalidTopics, Set<String> addInternalTopics, Node newController, + Map<String, Uuid> topicIds, BiPredicate<String, Boolean> retainTopic) { Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic)); Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size()); + Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size()); + + // We want the most recent topic ID. We start with the previous ID stored for retained topics and then + // update with newest information in the MetadataResponse. + // If the newest MetadataResponse: + // - contains a new topic with no ID, add no IDs to newTopicIds Review comment: nit: some of these cases do not seem worth mentioning. I think we're just saying that we always take the latest state, removing existing topicIds if necessary. ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); + assertTrue(cluster.topicIds().isEmpty()); + } + + @Test + public void testMetadataMergeOnIdDowngrade() { + Time time = new MockTime(); + Map<String, Uuid> topicIds = new HashMap<>(); + + final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) { + @Override + protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return retainTopics.get().contains(topic); + } + }; + + // Initialize a metadata instance with two topics. Both will be retained. + String clusterId = "clusterId"; + int nodes = 2; + Map<String, Integer> topicPartitionCounts = new HashMap<>(); + topicPartitionCounts.put("validTopic1", 2); + topicPartitionCounts.put("validTopic2", 3); + + retainTopics.set(new HashSet<>(Arrays.asList( + "validTopic1", + "validTopic2"))); + + topicIds.put("validTopic1", Uuid.randomUuid()); + topicIds.put("validTopic2", Uuid.randomUuid()); + MetadataResponse metadataResponse = + RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP) + topicIds.remove("validTopic1"); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + Cluster cluster = metadata.fetch(); + // We still have the topic, but it just doesn't have an ID. + assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2"))); Review comment: nit: the first argument should be the expected one ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); + assertTrue(cluster.topicIds().isEmpty()); + } + + @Test + public void testMetadataMergeOnIdDowngrade() { + Time time = new MockTime(); + Map<String, Uuid> topicIds = new HashMap<>(); + + final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) { + @Override + protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return retainTopics.get().contains(topic); + } + }; + + // Initialize a metadata instance with two topics. Both will be retained. + String clusterId = "clusterId"; + int nodes = 2; + Map<String, Integer> topicPartitionCounts = new HashMap<>(); + topicPartitionCounts.put("validTopic1", 2); + topicPartitionCounts.put("validTopic2", 3); + + retainTopics.set(new HashSet<>(Arrays.asList( + "validTopic1", + "validTopic2"))); + + topicIds.put("validTopic1", Uuid.randomUuid()); + topicIds.put("validTopic2", Uuid.randomUuid()); + MetadataResponse metadataResponse = + RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP) + topicIds.remove("validTopic1"); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + Cluster cluster = metadata.fetch(); + // We still have the topic, but it just doesn't have an ID. + assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2"))); + assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); Review comment: nit: I think we can make this assertion stronger ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); - metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); + assertTrue(cluster.topicIds().isEmpty()); + } + + @Test + public void testMetadataMergeOnIdDowngrade() { + Time time = new MockTime(); + Map<String, Uuid> topicIds = new HashMap<>(); + + final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) { + @Override + protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { + return retainTopics.get().contains(topic); + } + }; + + // Initialize a metadata instance with two topics. Both will be retained. + String clusterId = "clusterId"; + int nodes = 2; + Map<String, Integer> topicPartitionCounts = new HashMap<>(); + topicPartitionCounts.put("validTopic1", 2); + topicPartitionCounts.put("validTopic2", 3); + + retainTopics.set(new HashSet<>(Arrays.asList( + "validTopic1", + "validTopic2"))); + + topicIds.put("validTopic1", Uuid.randomUuid()); + topicIds.put("validTopic2", Uuid.randomUuid()); + MetadataResponse metadataResponse = + RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP) + topicIds.remove("validTopic1"); + metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); + retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic))); + + Cluster cluster = metadata.fetch(); + // We still have the topic, but it just doesn't have an ID. + assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2"))); + assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2); + assertTrue(cluster.topicIds().containsAll(topicIds.values())); + assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1")); + } + Review comment: nit: extra newline (and above before the end brace) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org