hachikuji commented on a change in pull request #11004: URL: https://github.com/apache/kafka/pull/11004#discussion_r749668458
########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -372,6 +379,49 @@ public void testUpdateLastEpoch() { assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12)); } + @Test + public void testEpochUpdateOnChangedTopicIds() { + TopicPartition tp = new TopicPartition("topic-1", 0); + Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + + MetadataResponse metadataResponse = emptyMetadataResponse(); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L); + + // Start with a topic with no topic ID + metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L); + assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + + // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases. + // Don't update to an older one + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L); + assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + + // Don't cause update if it's the same one + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L); + assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp)); + + // Update if we see newer epoch + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L); + assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp)); + + // We should also update if we see a new topicId even if the epoch is lower + Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid()); + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, newTopicIds); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L); + assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp)); + + // Finally, update when the topic ID is new and the epoch is higher + Map<String, Uuid> newTopicIds2 = Collections.singletonMap("topic-1", Uuid.randomUuid()); + metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, newTopicIds2); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L); + assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp)); + Review comment: nit: unneeded newline ########## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ########## @@ -375,17 +382,25 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) /** * Compute the latest partition metadata to cache given ordering by leader epochs (if both - * available and reliable). + * available and reliable) and whether the topic ID changed. */ private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata( MetadataResponse.PartitionMetadata partitionMetadata, - boolean hasReliableLeaderEpoch) { + boolean hasReliableLeaderEpoch, + Uuid topicId, + Uuid oldTopicId) { TopicPartition tp = partitionMetadata.topicPartition; if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); - // If the received leader epoch is at least the same as the previous one, update the metadata Integer currentEpoch = lastSeenLeaderEpochs.get(tp); - if (currentEpoch == null || newEpoch >= currentEpoch) { + if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { + // If both topic IDs were valid and the topic ID changed, update the metadata + log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " + Review comment: Can we change this level to INFO? ########## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ########## @@ -77,6 +79,11 @@ private static MetadataResponse emptyMetadataResponse() { Collections.emptyList()); } + private <T> void assertEqualCollections(Collection<T> expected, Collection<T> actual) { Review comment: nit: this name seems misleading since order could be important for arbitrary collections. Since we only have a couple uses, maybe we can get rid of it and use `assertEquals(new HashSet(A), new HashSet(B))` for example. ########## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ########## @@ -158,13 +148,14 @@ MetadataCache mergeWith(String newClusterId, Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size()); Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size()); - // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse - // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic. - for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) { - if (shouldRetainTopic.test(entry.getKey())) { - newTopicIds.put(entry.getKey(), entry.getValue()); + // We want the most recent topic ID. We start with the previous ID stored for retained topics and then + // update with newest information from the MetadataResponse. We always take the latest state, removing existing + // topic IDs if the latest state contains the topic name but not a topic ID. + this.topicIds.forEach((topicName, topicId) -> { Review comment: nit: simpler or not? ```java Map<String, Uuid> newTopicIds = topicIds.entrySet().stream() .filter(entry -> shouldRetainTopic.test(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() { fetchable.put(node, builder); } - builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset, + Uuid topicId = metadata.topicId(partition.topic()); Review comment: It could be updated in a separate thread. I cannot see how that would be a problem though. We do have synchronization in `Metadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org