jolshan commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r728455837
##########
File path:
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -249,115 +215,126 @@ public String toString() {
* Another reason is because we make use of the list ordering to
optimize the preparation of
* incremental fetch requests (see below).
*/
- private LinkedHashMap<TopicPartition, PartitionData> next;
- private Map<String, Uuid> topicIds;
+ private LinkedHashMap<TopicIdPartition, PartitionData> next;
private final boolean copySessionPartitions;
private int partitionsWithoutTopicIds = 0;
+ private int partitionsWithTopicIds = 0;
Builder() {
this.next = new LinkedHashMap<>();
- this.topicIds = new HashMap<>();
this.copySessionPartitions = true;
}
Builder(int initialSize, boolean copySessionPartitions) {
this.next = new LinkedHashMap<>(initialSize);
- this.topicIds = new HashMap<>(initialSize);
this.copySessionPartitions = copySessionPartitions;
}
/**
* Mark that we want data from this partition in the upcoming fetch.
*/
- public void add(TopicPartition topicPartition, Uuid topicId,
PartitionData data) {
- next.put(topicPartition, data);
- // topicIds should not change between adding partitions and
building, so we can use putIfAbsent
- if (!topicId.equals(Uuid.ZERO_UUID)) {
- topicIds.putIfAbsent(topicPartition.topic(), topicId);
- } else {
+ public void add(TopicIdPartition topicIdPartition, PartitionData data)
{
+ next.put(topicIdPartition, data);
+ if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
partitionsWithoutTopicIds++;
+ } else {
+ partitionsWithTopicIds++;
+ }
+ }
+
+ private Map<TopicIdPartition, PartitionData> buildFullSession(boolean
canUseTopicIds) {
+ if (log.isDebugEnabled()) {
+ log.debug("Built full fetch {} for node {} with {}.",
+ nextMetadata, node,
partitionsToLogString(next.keySet()));
}
+ sessionPartitions = next;
+ next = null;
+ // Only add topic IDs to the session if we are using topic IDs.
+ sessionTopicNames = new HashMap<>();
+ if (canUseTopicIds) {
+ Map<Uuid, Set<String>> newTopicNames =
sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+ Collectors.mapping(topicIdPartition ->
topicIdPartition.topicPartition().topic(), Collectors.toSet())));
+
+ sessionTopicNames = new HashMap<>(newTopicNames.size());
+ // There should only be one topic name per topic ID.
+ newTopicNames.forEach((topicId, topicNamesSet) ->
topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+ } else {
+ sessionTopicNames = new HashMap<>();
+ }
+ return Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
}
public FetchRequestData build() {
boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
if (nextMetadata.isFull()) {
- if (log.isDebugEnabled()) {
- log.debug("Built full fetch {} for node {} with {}.",
- nextMetadata, node,
partitionsToLogString(next.keySet()));
- }
- sessionPartitions = next;
- next = null;
- // Only add topic IDs to the session if we are using topic IDs.
- if (canUseTopicIds) {
- sessionTopicIds = topicIds;
- sessionTopicNames = new HashMap<>(topicIds.size());
- topicIds.forEach((name, id) -> sessionTopicNames.put(id,
name));
- } else {
- sessionTopicIds = new HashMap<>();
- sessionTopicNames = new HashMap<>();
- }
- topicIds = null;
- Map<TopicPartition, PartitionData> toSend =
- Collections.unmodifiableMap(new
LinkedHashMap<>(sessionPartitions));
- Map<String, Uuid> toSendTopicIds =
- Collections.unmodifiableMap(new
HashMap<>(sessionTopicIds));
- Map<Uuid, String> toSendTopicNames =
- Collections.unmodifiableMap(new
HashMap<>(sessionTopicNames));
- return new FetchRequestData(toSend, Collections.emptyList(),
toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+ Map<TopicIdPartition, PartitionData> toSend =
buildFullSession(canUseTopicIds);
+ return new FetchRequestData(toSend, Collections.emptyList(),
toSend, nextMetadata, canUseTopicIds);
}
- List<TopicPartition> added = new ArrayList<>();
- List<TopicPartition> removed = new ArrayList<>();
- List<TopicPartition> altered = new ArrayList<>();
- for (Iterator<Entry<TopicPartition, PartitionData>> iter =
+ // If we were previously using a session without IDs and an ID was
added to the builder, we will close the current session and open a new one with
IDs.
+ // Same if vice versa.
+ boolean closeSessionDueToTopicIdChange = (requestUsedTopicIds &&
partitionsWithoutTopicIds > 0) || (!requestUsedTopicIds &&
partitionsWithTopicIds > 0);
+
+ if (closeSessionDueToTopicIdChange) {
+ canUseTopicIds = partitionsWithTopicIds > 0;
+ Map<TopicIdPartition, PartitionData> toSend =
buildFullSession(canUseTopicIds);
+ if (canUseTopicIds && partitionsWithoutTopicIds == 0 ||
!canUseTopicIds && partitionsWithTopicIds == 0)
+ return new FetchRequestData(toSend,
Collections.emptyList(), toSend, nextMetadata.nextCloseExisting(),
!requestUsedTopicIds);
+ Map<TopicIdPartition, PartitionData> emptyMap = new
LinkedHashMap<>();
+ return new FetchRequestData(emptyMap, Collections.emptyList(),
emptyMap, nextMetadata.closeExisting(), !requestUsedTopicIds);
+ }
+
+ List<TopicIdPartition> added = new ArrayList<>();
+ List<TopicIdPartition> removed = new ArrayList<>();
+ List<TopicIdPartition> altered = new ArrayList<>();
+ for (Iterator<Entry<TopicIdPartition, PartitionData>> iter =
sessionPartitions.entrySet().iterator(); iter.hasNext();
) {
- Entry<TopicPartition, PartitionData> entry = iter.next();
- TopicPartition topicPartition = entry.getKey();
+ Entry<TopicIdPartition, PartitionData> entry = iter.next();
+ TopicIdPartition topicIdPartition = entry.getKey();
PartitionData prevData = entry.getValue();
- PartitionData nextData = next.remove(topicPartition);
+ PartitionData nextData = next.remove(topicIdPartition);
if (nextData != null) {
if (!prevData.equals(nextData)) {
// Re-add the altered partition to the end of 'next'
- next.put(topicPartition, nextData);
+ next.put(topicIdPartition, nextData);
entry.setValue(nextData);
- altered.add(topicPartition);
+ altered.add(topicIdPartition);
}
} else {
// Remove this partition from the session.
iter.remove();
// Indicate that we no longer want to listen to this
partition.
- removed.add(topicPartition);
+ removed.add(topicIdPartition);
Review comment:
I think we may even be able to get away with fewer maps. I see in the
commit you have we add to topicIDs at the start but I'm not sure that works if
we have more than one ID for a topic. I was thinking if we stored the ID in the
fetch data, we wouldn't need to build a map from ids to names. Do we still use
that anywhere?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]