jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743241513
########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; Review comment: I'm not sure I follow. Did you mean the other test file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org