dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r743749915
########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -199,26 +235,31 @@ public FetchRequest build(short version) { fetchRequestData.setMaxBytes(maxBytes); fetchRequestData.setIsolationLevel(isolationLevel.id()); fetchRequestData.setForgottenTopicsData(new ArrayList<>()); - toForget.stream() - .collect(Collectors.groupingBy(TopicPartition::topic, LinkedHashMap::new, Collectors.toList())) - .forEach((topic, partitions) -> - fetchRequestData.forgottenTopicsData().add(new FetchRequestData.ForgottenTopic() - .setTopic(topic) - .setTopicId(topicIds.getOrDefault(topic, Uuid.ZERO_UUID)) - .setPartitions(partitions.stream().map(TopicPartition::partition).collect(Collectors.toList()))) - ); - fetchRequestData.setTopics(new ArrayList<>()); + + Map<String, FetchRequestData.ForgottenTopic> forgottenTopicMap = new LinkedHashMap<>(); + addToForgottenTopicMap(removed, forgottenTopicMap); + + // If a version older than v13 is used, topic-partition which were replaced + // by a topic-partition with the same name but a different topic ID are not + // sent out in the "forget" set in order to not remove the newly added + // partition in the "fetch" set. + if (version >= 13) { + addToForgottenTopicMap(replaced, forgottenTopicMap); + } Review comment: Should we add a few unit tests to validate the changes that we have done in this class? We could add a few to FetchRequestTest (not use if it already exists though). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org