dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r743576928



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() {
         List<Integer> partitions = Arrays.asList(0, 1);
         partitions.forEach(partition -> {
             String testType = partition == 0 ? "updating a partition" : 
"adding a new partition";
-            Map<String, Uuid> topicIds = Collections.singletonMap("foo", 
Uuid.randomUuid());
+            Uuid fooId = Uuid.randomUuid();
             FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 
1);
             FetchSessionHandler.Builder builder = handler.newBuilder();
-            builder.add(new TopicPartition("foo", 0),  topicIds.get("foo"),
-                    new FetchRequest.PartitionData(0, 100, 200, 
Optional.empty()));
+            builder.add(new TopicPartition("foo", 0),
+                    new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
             FetchSessionHandler.FetchRequestData data = builder.build();
-            assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+            assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
                     data.toSend(), data.sessionPartitions());
             assertTrue(data.metadata().isFull());
             assertTrue(data.canUseTopicIds());
 
             FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                    respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
+                    respMap(new RespEntry("foo", 0, fooId, 10, 20)));
             handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
 
             // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
-                    new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
+            builder2.add(new TopicPartition("foo", partition),
+                    new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 
210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
-            // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
-            // The receiving broker will close the session if we were 
previously using topic IDs.
+            // Should have the same session ID, and next epoch and can no 
longer use topic IDs.
+            // The receiving broker will handle closing the session.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
             assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testTopicIdReplaced(boolean fetchRequestUsesIds) {
+        TopicPartition tp = new TopicPartition("foo", 0);
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        Uuid topicId1 = Uuid.randomUuid();
+        builder.add(tp,
+                new FetchRequest.PartitionData(topicId1, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertTrue(data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20)));
+        handler.handleResponse(resp, (short) 12);
+
+        // Try to add a new topic ID.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        Uuid topicId2 = Uuid.randomUuid();
+        // Use the same data besides the topic ID.
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty());
+        builder2.add(tp, partitionData);
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // The old topic ID partition should be in toReplace, and the new one 
should be in toSend.
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)),
+                data2.toSend(), data2.sessionPartitions());
+        assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, 
tp)), data2.toReplace());
+        // Should have the same session ID, and next epoch and can use topic 
IDs.
+        assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session");
+        assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch");
+        assertTrue(data2.canUseTopicIds());
+
+        short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 
12;
+        FetchRequest fetchRequest = FetchRequest.Builder
+                .forReplica(version, 0, 1, 1, data2.toSend())
+                .removed(data2.toForget())
+                .replaced(data2.toReplace())
+                .metadata(data2.metadata()).build(version);
+
+        assertEquals(fetchRequestUsesIds, 
fetchRequest.data().forgottenTopicsData().size() > 0);
+        assertEquals(1, fetchRequest.data().topics().size());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean 
startsWithTopicIds) {
+        Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+        Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid();
+        short responseVersion = startsWithTopicIds ? 
ApiKeys.FETCH.latestVersion() : 12;
+
+        TopicPartition tp0 = new TopicPartition("foo", 0);
+        TopicPartition tp1 = new TopicPartition("bar", 1);
+
+        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+        FetchSessionHandler.Builder builder = handler.newBuilder();
+        builder.add(tp0,
+                new FetchRequest.PartitionData(fooId, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data = builder.build();
+        assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)),
+                data.toSend(), data.sessionPartitions());
+        assertTrue(data.metadata().isFull());
+        assertEquals(startsWithTopicIds, data.canUseTopicIds());
+
+        FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
+                respMap(new RespEntry("foo", 0, fooId, 10, 20)));
+        handler.handleResponse(resp, responseVersion);
+
+        // Re-add the first partition. Then add a partition with opposite ID 
usage.
+        FetchSessionHandler.Builder builder2 = handler.newBuilder();
+        builder2.add(tp0,
+                new FetchRequest.PartitionData(fooId, 10, 110, 210, 
Optional.empty()));
+        builder2.add(tp1,
+                new FetchRequest.PartitionData(barId, 0, 100, 200, 
Optional.empty()));
+        FetchSessionHandler.FetchRequestData data2 = builder2.build();
+        // Should have the same session ID, and the next epoch and can not use 
topic IDs.
+        // The receiving broker will handle closing the session.
+        assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session");
+        assertEquals(1, data2.metadata().epoch(), "Did not have final epoch");
+        assertFalse(data2.canUseTopicIds());
+    }
+
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) {
         // We want to test when all topics are removed from the session
         Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
-        Short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
-        Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId);
+        short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 
12;
         FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
 
         // Add topic foo to the session
         FetchSessionHandler.Builder builder = handler.newBuilder();
-        builder.add(new TopicPartition("foo", 0), topicIds.get("foo"),
-                new FetchRequest.PartitionData(0, 100, 200, Optional.empty()));
+        builder.add(new TopicPartition("foo", 0),
+                new FetchRequest.PartitionData(topicId, 0, 100, 200, 
Optional.empty()));
         FetchSessionHandler.FetchRequestData data = builder.build();
-        assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
+        assertMapsEqual(reqMap(new ReqEntry("foo", topicId, 0, 0, 100, 200)),
                 data.toSend(), data.sessionPartitions());
         assertTrue(data.metadata().isFull());
         assertEquals(useTopicIds, data.canUseTopicIds());
 
         FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123,
-                respMap(new RespEntry("foo", 0, 10, 20)), topicIds);
-        handler.handleResponse(resp, responseVersion.shortValue());
+                respMap(new RespEntry("foo", 0, topicId, 10, 20)));
+        handler.handleResponse(resp, responseVersion);
 
         // Remove the topic from the session
         FetchSessionHandler.Builder builder2 = handler.newBuilder();
         FetchSessionHandler.FetchRequestData data2 = builder2.build();
-        // Should have the same session ID and next epoch, but can no longer 
use topic IDs.
-        // The receiving broker will close the session if we were previously 
using topic IDs.
+        // Should have the same session ID, next epoch, and same ID usage.
         assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when useTopicIds was " + useTopicIds);

Review comment:
       Yeah, it would be good to assert what we expect in `data2` for 
completeness.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to