[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670769638



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If both topic IDs were valid and the topic ID changed, update 
the metadata
+} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {

Review comment:
   Ah ok. I'll switch it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670658031



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
 assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
 assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
   The collection types are different so I made a helper method to test 
both containAll. There may have been a helper somewhere else in the code but I 
couldn't find it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670630904



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateOnChangedTopicIds() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic with no topic ID
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));
+
+// We should treat an added topic ID as though it is the same topic. 
Handle only when epoch increases.
+// Don't update to an older one
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 10));

Review comment:
   Should I update this for the test above as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670562472



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
 assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
 assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
 // Perform another metadata update, but this time all topic metadata 
should be cleared.
 retainTopics.set(Collections.emptySet());
 
-metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, 
newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+metadataResponse = 
RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, 
newTopicPartitionCounts, _tp -> 300, topicIds);
 metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+topicIds.forEach((topicName, topicId) -> 
assertEquals(metadata.topicId(topicName), null));
 
 cluster = metadata.fetch();
 assertEquals(cluster.clusterResource().clusterId(), newClusterId);
 assertEquals(cluster.nodes().size(), newNodes);
 assertEquals(cluster.invalidTopics(), Collections.emptySet());
 assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
 assertEquals(cluster.topics(), Collections.emptySet());
+assertTrue(cluster.topicIds().isEmpty());
+}
+
+@Test
+public void testMetadataMergeOnIdDowngrade() {
+Time time = new MockTime();
+Map topicIds = new HashMap<>();
+
+final AtomicReference> retainTopics = new 
AtomicReference<>(new HashSet<>());
+metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new 
LogContext(), new ClusterResourceListeners()) {
+@Override
+protected boolean retainTopic(String topic, boolean isInternal, 
long nowMs) {
+return retainTopics.get().contains(topic);
+}
+};
+
+// Initialize a metadata instance with two topics. Both will be 
retained.
+String clusterId = "clusterId";
+int nodes = 2;
+Map topicPartitionCounts = new HashMap<>();
+topicPartitionCounts.put("validTopic1", 2);
+topicPartitionCounts.put("validTopic2", 3);
+
+retainTopics.set(new HashSet<>(Arrays.asList(
+"validTopic1",
+"validTopic2")));
+
+topicIds.put("validTopic1", Uuid.randomUuid());
+topicIds.put("validTopic2", Uuid.randomUuid());
+MetadataResponse metadataResponse =
+RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, 
Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+// Try removing the topic ID from keepValidTopic (simulating receiving 
a request from a controller with an older IBP)
+topicIds.remove("validTopic1");
+metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, 
nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, true, 
time.milliseconds());
+retainTopics.get().forEach(topic -> 
assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+Cluster cluster = metadata.fetch();
+// We still have the topic, but it just doesn't have an ID.
+assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("validTopic1", "validTopic2")));

Review comment:
   I can switch it, but I was trying to keep the ordering consistent in the 
file. For example. The test above does the same thing:
   `assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670560488



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
 assertEquals(cluster.topics(), new 
HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
 assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
 assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
   To make it stronger, do we just also assert the inverse 
`(topicIds.values().containsAll(cluster.topicIds())` Or am I being silly and we 
can just check equality here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670559157



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If both topic IDs were valid and the topic ID changed, update 
the metadata
+} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {

Review comment:
   Also, these checks result in the same thing. The only difference is the 
log debug line. If it makes more sense to log the topic ID change, I can switch 
the order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670558097



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
+// update with newest information in the MetadataResponse.
+// If the newest MetadataResponse:
+//- contains a new topic with no ID, add no IDs to newTopicIds

Review comment:
   sure. that's easier to understand I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670557719



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -69,13 +73,20 @@ private MetadataCache(String clusterId,
   Set invalidTopics,
   Set internalTopics,
   Node controller,
+  Map topicIds,
   Cluster clusterInstance) {
 this.clusterId = clusterId;
 this.nodes = nodes;
 this.unauthorizedTopics = unauthorizedTopics;
 this.invalidTopics = invalidTopics;
 this.internalTopics = internalTopics;
 this.controller = controller;
+this.topicIds = topicIds;
+
+this.topicNames = new HashMap<>(topicIds.size());

Review comment:
   sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670557519



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If both topic IDs were valid and the topic ID changed, update 
the metadata
+} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {
+log.debug("Topic ID for partition {} changed from {} to {}, so 
this topic must have been recreated. " +
+"Using the newly updated metadata.", tp, 
oldTopicId, topicId);

Review comment:
   So part of the reason why I say this is that this method decides whether 
we use or ignore the new metadata to update the cache in general. But I can 
change to the epoch message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670553966



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -316,20 +325,29 @@ private MetadataCache 
handleMetadataResponse(MetadataResponse metadataResponse,
 Set invalidTopics = new HashSet<>();
 
 List partitions = new 
ArrayList<>();
+Map topicIds = new HashMap<>();
 for (MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
-topics.add(metadata.topic());
+String topicName = metadata.topic();
+Uuid topicId = metadata.topicId();
+topics.add(topicName);
+// We only update if the current metadata since we can only 
compare when both topics have valid IDs
+Uuid oldTopicId = null;
+if (!topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
   Yeah. There can not be a null uuid in the request. but I can rephrase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670554176



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
 }
 }
 
+public synchronized Uuid topicId(String topicName) {

Review comment:
   Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-15 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670553369



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If both topic IDs were valid and the topic ID changed, update 
the metadata
+} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null 
&& !topicId.equals(oldTopicId)) {

Review comment:
   This bugged me a bit too. The issue is that the request itself uses 
Uuid.ZERO_UUID, so we'd just have to convert that to null. We can do that if it 
is clearer to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669899334



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+// we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+for (Map.Entry entry : this.topicIds.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey())) {
+newTopicIds.put(entry.getKey(), entry.getValue());
+}
+}
+
 for (PartitionMetadata partition : addPartitions) {
 newMetadataByPartition.put(partition.topicPartition, partition);
+Uuid id = topicIds.get(partition.topic());
+if (id != null)
+newTopicIds.put(partition.topic(), id);
+else
+// Remove if the latest metadata does not have a topic ID

Review comment:
   Yeah. That was my reasoning. I thought the upgrade/downgrade case would 
be rare and the guarantees harder to reason about there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669815073



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 int newEpoch = partitionMetadata.leaderEpoch.get();
 // If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+if (currentEpoch == null || newEpoch >= currentEpoch) {
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
   Yeah, I was thinking that too. I just have to be careful when comparing 
to remember the zero uuid case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-13 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669069127



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 int newEpoch = partitionMetadata.leaderEpoch.get();
 // If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+if (currentEpoch == null || newEpoch >= currentEpoch) {
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
   Hmm I don't actually have access to both the topic IDs in this method. I 
can just do the old one and/or pass in the parameter for the new one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-13 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669065110



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If the topic ID changed, updated the metadata
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +
+"Removing last seen epoch {} for the old partition {} 
and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+lastSeenLeaderEpochs.put(tp, newEpoch);
+return Optional.of(partitionMetadata);

Review comment:
   I see. I think the main issue here was that we would ignore metadata 
updates when we were simply looking at the epoch. I believe that this PR solves 
the problem, but we can continue to improve beyond this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-13 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669064028



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+// we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+for (Map.Entry entry : this.topicIds.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey())) {
+newTopicIds.put(entry.getKey(), entry.getValue());
+}
+}
+
 for (PartitionMetadata partition : addPartitions) {
 newMetadataByPartition.put(partition.topicPartition, partition);
+Uuid id = topicIds.get(partition.topic());
+if (id != null)
+newTopicIds.put(partition.topic(), id);
+else
+// Remove if the latest metadata does not have a topic ID

Review comment:
   I suppose it is not needed, but I'm not sure if it helps a lot to remove 
it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-07 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665654598



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
 return Optional.ofNullable(metadataByPartition.get(topicPartition));
 }
 
+Map topicIds() {

Review comment:
   It is used when getting all the topic IDs to put into the fetch 
request/session in Fetcher there. Maybe it is ok to call a method multiple 
times there. I also use it in tests, but maybe we could change that usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-07 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665652021



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
 return Optional.ofNullable(metadataByPartition.get(topicPartition));
 }
 
+Map topicIds() {

Review comment:
   Hmm. I suppose we could have lookup methods. This has implications for 
the Fetch PR though. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-07 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665649142



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+// we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+for (Map.Entry entry : this.topicIds.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey())) {
+newTopicIds.put(entry.getKey(), entry.getValue());
+}
+}
+
 for (PartitionMetadata partition : addPartitions) {
 newMetadataByPartition.put(partition.topicPartition, partition);
+Uuid id = topicIds.get(partition.topic());
+if (id != null)
+newTopicIds.put(partition.topic(), id);
+else
+// Remove if the latest metadata does not have a topic ID

Review comment:
   Yes, for the fetch path, we want to know when topic IDs are removed as 
quickly as possible so we can switch over to the older fetch version that uses 
topic names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org