dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743576928
########## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ########## @@ -428,82 +442,165 @@ public void testIdUsageRevokedOnIdDowngrade() { List<Integer> partitions = Arrays.asList(0, 1); partitions.forEach(partition -> { String testType = partition == 0 ? "updating a partition" : "adding a new partition"; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", Uuid.randomUuid()); + Uuid fooId = Uuid.randomUuid(); FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertTrue(data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); + respMap(new RespEntry("foo", 0, fooId, 10, 20))); handler.handleResponse(resp, ApiKeys.FETCH.latestVersion()); // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); - builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, - new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); + builder2.add(new TopicPartition("foo", partition), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, and next epoch and can no longer use topic IDs. + // The receiving broker will handle closing the session. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTopicIdReplaced(boolean fetchRequestUsesIds) { + TopicPartition tp = new TopicPartition("foo", 0); + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + Uuid topicId1 = Uuid.randomUuid(); + builder.add(tp, + new FetchRequest.PartitionData(topicId1, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", topicId1, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertTrue(data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10, 20))); + handler.handleResponse(resp, (short) 12); + + // Try to add a new topic ID. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + Uuid topicId2 = Uuid.randomUuid(); + // Use the same data besides the topic ID. + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0, 100, 200, Optional.empty()); + builder2.add(tp, partitionData); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // The old topic ID partition should be in toReplace, and the new one should be in toSend. + assertMapsEqual(reqMap(new ReqEntry("foo", topicId2, 0, 0, 100, 200)), + data2.toSend(), data2.sessionPartitions()); + assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), data2.toReplace()); + // Should have the same session ID, and next epoch and can use topic IDs. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch"); + assertTrue(data2.canUseTopicIds()); + + short version = fetchRequestUsesIds ? ApiKeys.FETCH.latestVersion() : 12; + FetchRequest fetchRequest = FetchRequest.Builder + .forReplica(version, 0, 1, 1, data2.toSend()) + .removed(data2.toForget()) + .replaced(data2.toReplace()) + .metadata(data2.metadata()).build(version); + + assertEquals(fetchRequestUsesIds, fetchRequest.data().forgottenTopicsData().size() > 0); + assertEquals(1, fetchRequest.data().topics().size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean startsWithTopicIds) { + Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; + Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid(); + short responseVersion = startsWithTopicIds ? ApiKeys.FETCH.latestVersion() : 12; + + TopicPartition tp0 = new TopicPartition("foo", 0); + TopicPartition tp1 = new TopicPartition("bar", 1); + + FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + builder.add(tp0, + new FetchRequest.PartitionData(fooId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data = builder.build(); + assertMapsEqual(reqMap(new ReqEntry("foo", fooId, 0, 0, 100, 200)), + data.toSend(), data.sessionPartitions()); + assertTrue(data.metadata().isFull()); + assertEquals(startsWithTopicIds, data.canUseTopicIds()); + + FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, + respMap(new RespEntry("foo", 0, fooId, 10, 20))); + handler.handleResponse(resp, responseVersion); + + // Re-add the first partition. Then add a partition with opposite ID usage. + FetchSessionHandler.Builder builder2 = handler.newBuilder(); + builder2.add(tp0, + new FetchRequest.PartitionData(fooId, 10, 110, 210, Optional.empty())); + builder2.add(tp1, + new FetchRequest.PartitionData(barId, 0, 100, 200, Optional.empty())); + FetchSessionHandler.FetchRequestData data2 = builder2.build(); + // Should have the same session ID, and the next epoch and can not use topic IDs. + // The receiving broker will handle closing the session. + assertEquals(123, data2.metadata().sessionId(), "Did not use same session"); + assertEquals(1, data2.metadata().epoch(), "Did not have final epoch"); + assertFalse(data2.canUseTopicIds()); + } + + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) { // We want to test when all topics are removed from the session Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; - Short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; - Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId); + short responseVersion = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1); // Add topic foo to the session FetchSessionHandler.Builder builder = handler.newBuilder(); - builder.add(new TopicPartition("foo", 0), topicIds.get("foo"), - new FetchRequest.PartitionData(0, 100, 200, Optional.empty())); + builder.add(new TopicPartition("foo", 0), + new FetchRequest.PartitionData(topicId, 0, 100, 200, Optional.empty())); FetchSessionHandler.FetchRequestData data = builder.build(); - assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)), + assertMapsEqual(reqMap(new ReqEntry("foo", topicId, 0, 0, 100, 200)), data.toSend(), data.sessionPartitions()); assertTrue(data.metadata().isFull()); assertEquals(useTopicIds, data.canUseTopicIds()); FetchResponse resp = FetchResponse.of(Errors.NONE, 0, 123, - respMap(new RespEntry("foo", 0, 10, 20)), topicIds); - handler.handleResponse(resp, responseVersion.shortValue()); + respMap(new RespEntry("foo", 0, topicId, 10, 20))); + handler.handleResponse(resp, responseVersion); // Remove the topic from the session FetchSessionHandler.Builder builder2 = handler.newBuilder(); FetchSessionHandler.FetchRequestData data2 = builder2.build(); - // Should have the same session ID and next epoch, but can no longer use topic IDs. - // The receiving broker will close the session if we were previously using topic IDs. + // Should have the same session ID, next epoch, and same ID usage. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when useTopicIds was " + useTopicIds); Review comment: Yeah, it would be good to assert what we expect in `data2` for completeness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org