jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r628405414
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
new LinkedHashMap<>(0);
+ /**
+ * All of the topic ids mapped to topic names for topics which exist in
the fetch request session.
+ */
+ private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+ /**
+ * All of the topic names mapped to topic ids for topics which exist in
the fetch request session.
+ */
+ private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+ public Map<Uuid, String> sessionTopicNames() {
+ return sessionTopicNames;
+ }
+
+ public boolean requestUsedTopicIds = false;
Review comment:
After running through some tests I realized why this didn't work. We can
go from version 13 to version 12 within the session, but we can't go from 12 to
13. This is because we have may have topics without IDs in the session. We will
try to return them using version 13 and they are all zero UUID. (We also have
this issue when we send a full request version 12 and the subsequent request is
empty. We could try to send version 13 request since we vacuously have IDs for
all topics in the request, but if we do have responses for the topics, then we
will try to send them back without topic IDs) If we tried to resolve them, we
may end up in a case where there is no valid ID and also no way to communicate
this (since we send back IDs). So I think we do need to store the state of the
previous request version in the session.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]