dajac commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r727909392
########## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ########## @@ -249,115 +215,126 @@ public String toString() { * Another reason is because we make use of the list ordering to optimize the preparation of * incremental fetch requests (see below). */ - private LinkedHashMap<TopicPartition, PartitionData> next; - private Map<String, Uuid> topicIds; + private LinkedHashMap<TopicIdPartition, PartitionData> next; private final boolean copySessionPartitions; private int partitionsWithoutTopicIds = 0; + private int partitionsWithTopicIds = 0; Builder() { this.next = new LinkedHashMap<>(); - this.topicIds = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); - this.topicIds = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ - public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) { - next.put(topicPartition, data); - // topicIds should not change between adding partitions and building, so we can use putIfAbsent - if (!topicId.equals(Uuid.ZERO_UUID)) { - topicIds.putIfAbsent(topicPartition.topic(), topicId); - } else { + public void add(TopicIdPartition topicIdPartition, PartitionData data) { + next.put(topicIdPartition, data); + if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) { partitionsWithoutTopicIds++; + } else { + partitionsWithTopicIds++; + } + } + + private Map<TopicIdPartition, PartitionData> buildFullSession(boolean canUseTopicIds) { + if (log.isDebugEnabled()) { + log.debug("Built full fetch {} for node {} with {}.", + nextMetadata, node, partitionsToLogString(next.keySet())); } + sessionPartitions = next; + next = null; + // Only add topic IDs to the session if we are using topic IDs. + sessionTopicNames = new HashMap<>(); + if (canUseTopicIds) { + Map<Uuid, Set<String>> newTopicNames = sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId, + Collectors.mapping(topicIdPartition -> topicIdPartition.topicPartition().topic(), Collectors.toSet()))); + + sessionTopicNames = new HashMap<>(newTopicNames.size()); + // There should only be one topic name per topic ID. + newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName))); + } else { + sessionTopicNames = new HashMap<>(); + } + return Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); } public FetchRequestData build() { boolean canUseTopicIds = partitionsWithoutTopicIds == 0; if (nextMetadata.isFull()) { - if (log.isDebugEnabled()) { - log.debug("Built full fetch {} for node {} with {}.", - nextMetadata, node, partitionsToLogString(next.keySet())); - } - sessionPartitions = next; - next = null; - // Only add topic IDs to the session if we are using topic IDs. - if (canUseTopicIds) { - sessionTopicIds = topicIds; - sessionTopicNames = new HashMap<>(topicIds.size()); - topicIds.forEach((name, id) -> sessionTopicNames.put(id, name)); - } else { - sessionTopicIds = new HashMap<>(); - sessionTopicNames = new HashMap<>(); - } - topicIds = null; - Map<TopicPartition, PartitionData> toSend = - Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); - Map<String, Uuid> toSendTopicIds = - Collections.unmodifiableMap(new HashMap<>(sessionTopicIds)); - Map<Uuid, String> toSendTopicNames = - Collections.unmodifiableMap(new HashMap<>(sessionTopicNames)); - return new FetchRequestData(toSend, Collections.emptyList(), toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds); + Map<TopicIdPartition, PartitionData> toSend = buildFullSession(canUseTopicIds); + return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata, canUseTopicIds); } - List<TopicPartition> added = new ArrayList<>(); - List<TopicPartition> removed = new ArrayList<>(); - List<TopicPartition> altered = new ArrayList<>(); - for (Iterator<Entry<TopicPartition, PartitionData>> iter = + // If we were previously using a session without IDs and an ID was added to the builder, we will close the current session and open a new one with IDs. + // Same if vice versa. + boolean closeSessionDueToTopicIdChange = (requestUsedTopicIds && partitionsWithoutTopicIds > 0) || (!requestUsedTopicIds && partitionsWithTopicIds > 0); + + if (closeSessionDueToTopicIdChange) { + canUseTopicIds = partitionsWithTopicIds > 0; + Map<TopicIdPartition, PartitionData> toSend = buildFullSession(canUseTopicIds); + if (canUseTopicIds && partitionsWithoutTopicIds == 0 || !canUseTopicIds && partitionsWithTopicIds == 0) + return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata.nextCloseExisting(), !requestUsedTopicIds); + Map<TopicIdPartition, PartitionData> emptyMap = new LinkedHashMap<>(); + return new FetchRequestData(emptyMap, Collections.emptyList(), emptyMap, nextMetadata.closeExisting(), !requestUsedTopicIds); + } + + List<TopicIdPartition> added = new ArrayList<>(); + List<TopicIdPartition> removed = new ArrayList<>(); + List<TopicIdPartition> altered = new ArrayList<>(); + for (Iterator<Entry<TopicIdPartition, PartitionData>> iter = sessionPartitions.entrySet().iterator(); iter.hasNext(); ) { - Entry<TopicPartition, PartitionData> entry = iter.next(); - TopicPartition topicPartition = entry.getKey(); + Entry<TopicIdPartition, PartitionData> entry = iter.next(); + TopicIdPartition topicIdPartition = entry.getKey(); PartitionData prevData = entry.getValue(); - PartitionData nextData = next.remove(topicPartition); + PartitionData nextData = next.remove(topicIdPartition); if (nextData != null) { if (!prevData.equals(nextData)) { // Re-add the altered partition to the end of 'next' - next.put(topicPartition, nextData); + next.put(topicIdPartition, nextData); entry.setValue(nextData); - altered.add(topicPartition); + altered.add(topicIdPartition); } } else { // Remove this partition from the session. iter.remove(); // Indicate that we no longer want to listen to this partition. - removed.add(topicPartition); + removed.add(topicIdPartition); Review comment: I have simplified the code and removed a few Maps along the way. Here is the diff: https://github.com/apache/kafka/compare/trunk...dajac:KAFKA-13111. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) Review comment: @jolshan With Ismael's PR (https://github.com/apache/kafka/pull/11374), this trick does not work any more. We need to think about an alternative/better approach. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) Review comment: Actually, you're right. That is not entirely true. I thought that the `requireNonNull` for the `topic` in one of the [constructor](https://github.com/apache/kafka/pull/11374/files#diff-3d6aa1dec2a2548f28148717926536cc937acec2ab4bd03a7bcdc58c84a6cbbaR38) would prevent this to work. However, as we use the other `TopicIdPartition` constructor here it is not impacted by the `requireNonNull`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) Review comment: In this case, it would be nice if we would have a `TopicIdPartition` which contains an optional topic name. For the context, the issue is that we might have partitions in the fetch requests for which the topic name is unknown or not yet known by the broker. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) Review comment: Actually, you're right. That is not entirely true. I thought that the `requireNonNull` for the `topic` in one of the [constructor](https://github.com/apache/kafka/pull/11374/files#diff-3d6aa1dec2a2548f28148717926536cc937acec2ab4bd03a7bcdc58c84a6cbbaR38) would prevent this to work. However as we use the other `TopicIdPartition` constructor in this case, it is not impacted by the `requireNonNull`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -708,40 +701,41 @@ class KafkaApis(val requestChannel: RequestChannel, None } - val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponseData.PartitionData)]() - val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - val sessionTopicIds = mutable.Map[String, Uuid]() + val erroneous = mutable.ArrayBuffer[(TopicIdPartition, FetchResponseData.PartitionData)]() + val interesting = mutable.ArrayBuffer[(TopicIdPartition, FetchRequest.PartitionData)]() if (fetchRequest.isFromFollower) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - fetchContext.foreachPartition { (topicPartition, topicId, data) => - sessionTopicIds.put(topicPartition.topic(), topicId) - if (!metadataCache.contains(topicPartition)) - erroneous += topicPartition -> FetchResponse.partitionResponse(topicPartition.partition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + fetchContext.foreachPartition { (topicIdPartition, data) => + if (topicIdPartition.topicPartition.topic == null ) Review comment: Sounds good, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org