dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r742933244



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = Uuid.randomUuid();
+        builder.add(tp,
+                new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertTrue(data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+        handler.handleResponse(resp, (short) 12);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = Uuid.randomUuid();
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+                data2.toSend(), data2.sessionPartitions());
+        assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+        // Should have the same session ID, and next epoch and can use topic 
IDs.
+        assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session");
+        assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch");
+        assertTrue(data2.canUseTopicIds());
+
+        short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 
12;
+        FetchRequest fetchRequest = FetchRequest.Builder
+                .forReplica(version, 0, 1, 1, data2.toSend())
+                .removed(data2.toForget())
+                .replaced(data2.toReplace())
+                .metadata(data2.metadata()).build(version);
+
+        assertEquals(fetchRequestUsesIds, 
fetchRequest.data().forgottenTopicsData().size() > 0);
+        assertEquals(1, fetchRequest.data().topics().size());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean 
startsWithTopicIds) {
+        Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+        Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid();
+        short responseVersion = startsWithTopicIds ? 
ApiKeys.FETCH.latestVersion() : 12;
+
+        TopicPartition tp0 = new TopicPartition("foo", 0);
+        TopicPartition tp1 = new TopicPartition("bar", 1);
+
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        builder.add(tp0,
+                new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, fooId, 10, 20)));
+        handler.handleResponse(resp, responseVersion);
+
+        // Re-add the first partition. Then add a partition with opposite ID 
usage.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        builder2.add(tp0,
+                new FetchRequest.PartitionData(fooId, 10, 110, 210, 
Optional.empty()));
+        builder2.add(tp1,
+                new FetchRequest.PartitionData(barId, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // Should have the same session ID, and the next epoch and can not use 
topic IDs.
+        // The receiving broker will handle closing the session.
+        assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session");
+        assertEquals(1, data2.metadata().epoch(), "Did not have final epoch");
+        assertFalse(data2.canUseTopicIds());
+    }
+
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) {
         // We want to test when all topics are removed from the session
         Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
-        Short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
-        Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId);
+        short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
         FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
 
         // Add topic foo to the session
         FetchSessionHandler.Builder builder = handler.newBuilder();
-        builder.add(new TopicPartition("foo", 0), topicIds.get("foo"),
-                new FetchRequest.PartitionData(0, 100, 200, Optional.empty()));
+        builder.add(new TopicPartition("foo", 0),
+                new FetchRequest.PartitionData(topicId, 0, 100, 200, 
Optional.empty()));
         FetchSessionHandler.FetchRequestData data = builder.build();
-        assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId, 0, 0, 100, 200)),
                 data.toSend(), data.sessionPartitions());
         assertTrue(data.metadata().isFull());
         assertEquals(useTopicIds, data.canUseTopicIds());
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
-        handler.handleResponse(resp, responseVersion.shortValue());
+                respMap(new RespEntry("foo", 0, topicId, 10, 20)));
+        handler.handleResponse(resp, responseVersion);
 
         // Remove the topic from the session
         FetchSessionHandler.Builder builder2 = handler.newBuilder();
         FetchSessionHandler.FetchRequestData data2 = builder2.build();
-        // Should have the same session ID and next epoch, but can no longer 
use topic IDs.
-        // The receiving broker will close the session if we were previously 
using topic IDs.
+        // Should have the same session ID, next epoch, and same ID usage.
         assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when useTopicIds was " + useTopicIds);

Review comment:
       It is curious that we don't assert the forgotten partitions here. Is 
there a reason?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -212,6 +217,9 @@ private void assignFromUser(Set<TopicPartition> partitions) 
{
         
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy",
 1,
             Collections.emptyMap(), singletonMap(topicName, 4),
             tp -> validLeaderEpoch, topicIds), false, 0L);
+
+        metadata.fetch().nodes().forEach(node ->
+                apiVersions.update(node.idString(), NodeApiVersions.create()));

Review comment:
       Do we still need this change?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -794,22 +793,23 @@ class ReplicaManagerTest {
 
       // We receive one valid request from the follower and replica state is 
updated
       var successfulFetch: Option[FetchPartitionData] = None
-      def callback(response: Seq[(TopicPartition, FetchPartitionData)]): Unit 
= {
-        successfulFetch = response.headOption.filter { case (topicPartition, 
_) => topicPartition == tp }.map { case (_, data) => data }
+      def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+        // Check the topic partition only since we are reusing this callback 
on different TopicIdPartitions.
+        successfulFetch = response.headOption.filter { case (topicIdPartition, 
_) => topicIdPartition.topicPartition == tidp.topicPartition }.map { case (_, 
data) => data }

Review comment:
       This is not ideal. Could we validate that the topic id is correct as 
well?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2659,6 +2661,9 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
                 autoCommitIntervalMs,
                 interceptors,
                 throwOnStableOffsetNotSupported);
+        ApiVersions apiVersions = new ApiVersions();
+        metadata.fetch().nodes().forEach(node ->
+                apiVersions.update(node.idString(), NodeApiVersions.create()));

Review comment:
       Do we still need this change?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = Uuid.randomUuid();
+        builder.add(tp,
+                new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertTrue(data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+        handler.handleResponse(resp, (short) 12);

Review comment:
       Why do we use 12 here?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -202,29 +203,30 @@ public void testSessionless() {
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
             addTopicId(topicIds, topicNames, "foo", version);
-            builder.add(new TopicPartition("foo", 0), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
-            builder.add(new TopicPartition("foo", 1), 
topicIds.getOrDefault("foo", Uuid.ZERO_UUID),
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));

Review comment:
       nit: Is it worth bringing back this line on the previous one as there is 
space now? It might be too long though.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = Uuid.randomUuid();
+        builder.add(tp,
+                new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertTrue(data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+        handler.handleResponse(resp, (short) 12);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = Uuid.randomUuid();
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+                data2.toSend(), data2.sessionPartitions());
+        assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+        // Should have the same session ID, and next epoch and can use topic 
IDs.
+        assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session");
+        assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch");
+        assertTrue(data2.canUseTopicIds());
+
+        short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 
12;

Review comment:
       Would it be more appropriate to move the above assertions to 
`FetchRequestTest`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = Uuid.randomUuid();
+        builder.add(tp,
+                new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertTrue(data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+        handler.handleResponse(resp, (short) 12);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = Uuid.randomUuid();
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+                data2.toSend(), data2.sessionPartitions());
+        assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());

Review comment:
       Should we also test when the current topic-partition in the session does 
not have a topic id? In this case, it should not be added to the `toReplace` 
set.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2069,10 +2071,10 @@ public void testReturnRecordsDuringRebalance() throws 
InterruptedException {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
         ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
-        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);
-
         initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), 
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata, assignor, true, groupInstanceId);

Review comment:
       Is there any reason for this change?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -4814,6 +4842,7 @@ private void buildDependencies(MetricConfig metricConfig,
         metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
+        client.setNodeApiVersions(NodeApiVersions.create());
         metrics = new Metrics(metricConfig, time);
         consumerClient = new ConsumerNetworkClient(logContext, client, 
metadata, time,
                 100, 1000, Integer.MAX_VALUE);

Review comment:
       The PR changed how some errors are handled in the `Fetcher`. Do we have 
any tests for this new behavior?

##########
File path: core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
##########
@@ -137,11 +137,11 @@ class DelayedFetchTest extends EasyMockSupport {
 
     val fetchStatus = FetchPartitionStatus(
       startOffsetMetadata = LogOffsetMetadata(fetchOffset),
-      fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, 
maxBytes, currentLeaderEpoch, lastFetchedEpoch))
+      fetchInfo = new FetchRequest.PartitionData(topicIds.get("topic"), 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch))

Review comment:
       nit: It seems that we could use `TopicIdPartition` directly and remove 
`topicIds` map entirely. We could also pass the `TopicIdPartition` to 
`buildFetchMetadata`.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -465,18 +477,18 @@ public void testFetchError() {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchedRecords();
         assertFalse(partitionRecords.containsKey(tp0));
     }
 
-    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, 
final long offset) {
+    private MockClient.RequestMatcher matchesOffset(final TopicIdPartition tp, 
final long offset) {
         return body -> {
             FetchRequest fetch = (FetchRequest) body;
-            Map<TopicPartition, FetchRequest.PartitionData> fetchData =  
fetch.fetchData(topicNames);
+            Map<TopicIdPartition, FetchRequest.PartitionData> fetchData =  
fetch.fetchData(topicNames);

Review comment:
       nit: There are two spaces after `=`.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -157,38 +156,38 @@ class FetchSessionTest {
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
 
-    val tp0 = new TopicPartition("foo", 0)
-    val tp1 = new TopicPartition("foo", 1)
-    val tp2 = new TopicPartition("bar", 1)
     val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> 
Uuid.randomUuid()).asJava
+    val tp0 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 0))
+    val tp1 = new TopicIdPartition(topicIds.get("foo"), new 
TopicPartition("foo", 1))
+    val tp2 = new TopicIdPartition(topicIds.get("bar"), new 
TopicPartition("bar", 1))
     val topicNames = topicIds.asScala.map(_.swap).asJava
 
-    def cachedLeaderEpochs(context: FetchContext): Map[TopicPartition, 
Optional[Integer]] = {
-      val mapBuilder = Map.newBuilder[TopicPartition, Optional[Integer]]
-      context.foreachPartition((tp, _, data) => mapBuilder += tp -> 
data.currentLeaderEpoch)
+    def cachedLeaderEpochs(context: FetchContext): Map[TopicIdPartition, 
Optional[Integer]] = {
+      val mapBuilder = Map.newBuilder[TopicIdPartition, Optional[Integer]]
+      context.foreachPartition((tp, data) => mapBuilder += tp -> 
data.currentLeaderEpoch)
       mapBuilder.result()
     }
 
     val requestData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    requestData1.put(tp0, new FetchRequest.PartitionData(0, 0, 100, 
Optional.empty()))
-    requestData1.put(tp1, new FetchRequest.PartitionData(10, 0, 100, 
Optional.of(1)))
-    requestData1.put(tp2, new FetchRequest.PartitionData(10, 0, 100, 
Optional.of(2)))
+    requestData1.put(tp0.topicPartition, new 
FetchRequest.PartitionData(topicIds.get("foo"), 0, 0, 100, Optional.empty()))
+    requestData1.put(tp1.topicPartition, new 
FetchRequest.PartitionData(topicIds.get("foo"), 10, 0, 100, Optional.of(1)))
+    requestData1.put(tp2.topicPartition, new 
FetchRequest.PartitionData(topicIds.get("bar"), 10, 0, 100, Optional.of(2)))

Review comment:
       nit: We could get the topic id from `tp*.topicId`.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+    val topicIds = topicNames.asScala.map(_.swap).asJava
+    val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+    val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+    val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+    val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-    // Create a new fetch session with foo-0
+    // Create a new fetch session with foo-0 and foo-1
     val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+    reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
       Optional.empty()))
-    val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-    // Start a fetch session using a request version that does not use topic 
IDs.
+    reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    // Simulate unknown topic ID for foo.
+    val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+    // We should not throw error since we have an older request version.
     val context1 = fetchManager.newContext(
       request1.version,
       request1.metadata,
       request1.isFromFollower,
-      request1.fetchData(topicNames),
-      request1.forgottenTopics(topicNames),
-      topicIds
+      request1.fetchData(topicNamesOnlyBar),
+      request1.forgottenTopics(topicNamesOnlyBar),
+      topicNamesOnlyBar
     )
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]

Review comment:
       Should we iterate over the partitions in the context to check the 
`TopicIdPartition`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -222,6 +230,9 @@ private void assignFromUserNoId(Set<TopicPartition> 
partitions) {
         metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1,
             Collections.emptyMap(), singletonMap("noId", 1),
             tp -> validLeaderEpoch, topicIds), false, 0L);
+
+        metadata.fetch().nodes().forEach(node ->
+                apiVersions.update(node.idString(), NodeApiVersions.create()));

Review comment:
       ditto. There is a few other cases in this file.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+    val topicIds = topicNames.asScala.map(_.swap).asJava
+    val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+    val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+    val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+    val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-    // Create a new fetch session with foo-0
+    // Create a new fetch session with foo-0 and foo-1
     val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+    reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
       Optional.empty()))
-    val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-    // Start a fetch session using a request version that does not use topic 
IDs.
+    reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    // Simulate unknown topic ID for foo.
+    val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+    // We should not throw error since we have an older request version.
     val context1 = fetchManager.newContext(
       request1.version,
       request1.metadata,
       request1.isFromFollower,
-      request1.fetchData(topicNames),
-      request1.forgottenTopics(topicNames),
-      topicIds
+      request1.fetchData(topicNamesOnlyBar),
+      request1.forgottenTopics(topicNamesOnlyBar),
+      topicNamesOnlyBar
     )
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    respData1.put(emptyFoo0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
-      .setHighWatermark(100)
-      .setLastStableOffset(100)
-      .setLogStartOffset(100))
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+    respData1.put(emptyFoo1, new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
     val resp1 = context1.updateAndGenerateResponseData(respData1)
+    // On the latest request version, we should have unknown topic ID errors.
     assertEquals(Errors.NONE, resp1.error())
     assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    assertEquals(2, resp1.responseData(topicNames, request1.version).size)
+    resp1.responseData(topicNames, request1.version).forEach( (_, resp) => 
assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode))
 
-    // Create an incremental fetch request as though no topics changed. 
However, send a v13 request.
-    // Also simulate the topic ID found on the server.
-    val fooId = Uuid.randomUuid()
-    topicIds.put("foo", fooId)
-    topicNames.put(fooId, "foo")
+    // Create an incremental request where we resolve the partitions
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNames),
       request2.forgottenTopics(topicNames),
-      topicIds
+      topicNames
     )
-
-    assertEquals(classOf[SessionErrorContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR,
-      context2.updateAndGenerateResponseData(respData2).error())
+    // run through each partition to resolve them
+    context2.foreachPartition((_, _) => ())

Review comment:
       Should we assert that the `TopicIdPartition` received here contains the 
topic name?

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+    val topicIds = topicNames.asScala.map(_.swap).asJava
+    val foo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 0))
+    val foo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition("foo", 1))
+    val emptyFoo0 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 0))
+    val emptyFoo1 = new TopicIdPartition(topicIds.getOrDefault("foo", 
Uuid.ZERO_UUID), new TopicPartition(null, 1))
 
-    // Create a new fetch session with foo-0
+    // Create a new fetch session with foo-0 and foo-1
     val reqData1 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    reqData1.put(new TopicPartition("foo", 0), new 
FetchRequest.PartitionData(0, 0, 100,
+    reqData1.put(foo0.topicPartition, new 
FetchRequest.PartitionData(foo0.topicId, 0, 0, 100,
       Optional.empty()))
-    val request1 = createRequestWithoutTopicIds(JFetchMetadata.INITIAL, 
reqData1, topicIds, EMPTY_PART_LIST, false)
-    // Start a fetch session using a request version that does not use topic 
IDs.
+    reqData1.put(foo1.topicPartition, new 
FetchRequest.PartitionData(foo1.topicId,10, 0, 100,
+      Optional.empty()))
+    val request1 = createRequest(JFetchMetadata.INITIAL, reqData1, 
EMPTY_PART_LIST, false)
+    // Simulate unknown topic ID for foo.
+    val topicNamesOnlyBar = Collections.singletonMap(topicIds.get("bar"), 
"bar")
+    // We should not throw error since we have an older request version.
     val context1 = fetchManager.newContext(
       request1.version,
       request1.metadata,
       request1.isFromFollower,
-      request1.fetchData(topicNames),
-      request1.forgottenTopics(topicNames),
-      topicIds
+      request1.fetchData(topicNamesOnlyBar),
+      request1.forgottenTopics(topicNamesOnlyBar),
+      topicNamesOnlyBar
     )
     assertEquals(classOf[FullFetchContext], context1.getClass)
-    val respData1 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    respData1.put(new TopicPartition("foo", 0), new 
FetchResponseData.PartitionData()
+    val respData1 = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData]
+    respData1.put(emptyFoo0, new FetchResponseData.PartitionData()
       .setPartitionIndex(0)
-      .setHighWatermark(100)
-      .setLastStableOffset(100)
-      .setLogStartOffset(100))
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
+    respData1.put(emptyFoo1, new FetchResponseData.PartitionData()
+      .setPartitionIndex(1)
+      .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code))
     val resp1 = context1.updateAndGenerateResponseData(respData1)
+    // On the latest request version, we should have unknown topic ID errors.
     assertEquals(Errors.NONE, resp1.error())
     assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
+    assertEquals(2, resp1.responseData(topicNames, request1.version).size)
+    resp1.responseData(topicNames, request1.version).forEach( (_, resp) => 
assertEquals(Errors.UNKNOWN_TOPIC_ID.code, resp.errorCode))
 
-    // Create an incremental fetch request as though no topics changed. 
However, send a v13 request.
-    // Also simulate the topic ID found on the server.
-    val fooId = Uuid.randomUuid()
-    topicIds.put("foo", fooId)
-    topicNames.put(fooId, "foo")
+    // Create an incremental request where we resolve the partitions
     val reqData2 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, topicIds, EMPTY_PART_LIST, false)
+    val request2 = createRequest(new JFetchMetadata(resp1.sessionId(), 1), 
reqData2, EMPTY_PART_LIST, false)
     val context2 = fetchManager.newContext(
       request2.version,
       request2.metadata,
       request2.isFromFollower,
       request2.fetchData(topicNames),
       request2.forgottenTopics(topicNames),
-      topicIds
+      topicNames
     )
-
-    assertEquals(classOf[SessionErrorContext], context2.getClass)
-    val respData2 = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-    assertEquals(Errors.FETCH_SESSION_TOPIC_ID_ERROR,
-      context2.updateAndGenerateResponseData(respData2).error())
+    // run through each partition to resolve them

Review comment:
       nit: I would expand this comment a little and stress the fact that topic 
names are lazily resolved when the partitions are iterated over.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -1091,18 +1089,20 @@ class AbstractFetcherThreadTest {
 
     override def buildFetch(partitionMap: Map[TopicPartition, 
PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
       val fetchData = mutable.Map.empty[TopicPartition, 
FetchRequest.PartitionData]
-      partitionMap.foreach { case (partition, state) =>
+      partitionMap.foreach { case (partition, state) => 0
+        .equals(0)

Review comment:
       `0.equals(0)` was very likely put here by mistake.

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava
+    val topicIds = topicNames.asScala.map(_.swap).asJava

Review comment:
       It seems to be that it would be simpler to declare `fooId` and `barId` 
and to use them instead of getting them from the map.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3530,37 +3534,37 @@ class KafkaApisTest {
   def testSizeOfThrottledPartitions(): Unit = {
     val topicNames = new util.HashMap[Uuid, String]
     val topicIds = new util.HashMap[String, Uuid]()
-    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse = {
-      val responseData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData](
+    def fetchResponse(data: Map[TopicIdPartition, String]): FetchResponse = {
+      val responseData = new util.LinkedHashMap[TopicIdPartition, 
FetchResponseData.PartitionData](
         data.map { case (tp, raw) =>
           tp -> new FetchResponseData.PartitionData()
-            .setPartitionIndex(tp.partition)
+            .setPartitionIndex(tp.topicPartition.partition)
             .setHighWatermark(105)
             .setLastStableOffset(105)
             .setLogStartOffset(0)
             .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8))))
       }.toMap.asJava)
 
       data.foreach{case (tp, _) =>
-        val id = Uuid.randomUuid()
-        topicIds.put(tp.topic(), id)
-        topicNames.put(id, tp.topic())
+        topicIds.put(tp.topicPartition.topic, tp.topicId)
+        topicNames.put(tp.topicId, tp.topicPartition.topic)
       }
-      FetchResponse.of(Errors.NONE, 100, 100, responseData, topicIds)
+      FetchResponse.of(Errors.NONE, 100, 100, responseData)
     }
 
-    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledPartition = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("throttledData", 0))
     val throttledData = Map(throttledPartition -> "throttledData")
     val expectedSize = 
FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
-      fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.iterator, topicIds)
+      fetchResponse(throttledData).responseData(topicNames, 
FetchResponseData.HIGHEST_SUPPORTED_VERSION).entrySet.asScala.map( entry =>
+      (new TopicIdPartition(Uuid.ZERO_UUID, entry.getKey), 
entry.getValue)).toMap.asJava.entrySet.iterator)
 
-    val response = fetchResponse(throttledData ++ Map(new 
TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+    val response = fetchResponse(throttledData ++ Map(new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("nonThrottledData", 0)) 
-> "nonThrottledData"))
 
     val quota = Mockito.mock(classOf[ReplicationQuotaManager])
     
Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
-      .thenAnswer(invocation => throttledPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
+      .thenAnswer(invocation => throttledPartition.topicPartition == 
invocation.getArgument(0).asInstanceOf[TopicPartition])
 
-    assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota, topicIds))
+    assertEquals(expectedSize, 
KafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
 response, quota))
   }
 
   @Test

Review comment:
       Should we add any tests for the new logic in KafkaApis?

##########
File path: core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
##########
@@ -659,88 +670,108 @@ class FetchSessionTest {
   }
 
   @Test
-  def testIncrementalFetchSessionWithIdsWhenSessionDoesNotUseIds() : Unit = {
+  def testFetchSessionWithUnknownId(): Unit = {
     val time = new MockTime()
     val cache = new FetchSessionCache(10, 1000)
     val fetchManager = new FetchManager(time, cache)
-    val topicIds = new util.HashMap[String, Uuid]()
-    val topicNames = new util.HashMap[Uuid, String]()
+    val topicNames = Map(Uuid.randomUuid() -> "foo", Uuid.randomUuid() -> 
"bar").asJava

Review comment:
       I wonder if we should add a third topic which is never resolved. What do 
you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to