hachikuji commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r590938521



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Base driver implementation for APIs which target partition leaders.
+ */
+public class PartitionLeaderStrategy implements 
AdminApiLookupStrategy<TopicPartition> {
+    private static final RequestScope SINGLE_REQUEST_SCOPE = new 
RequestScope() {
+    };
+
+    protected final Set<TopicPartition> topicPartitions;
+    private final Logger log;
+
+    public PartitionLeaderStrategy(
+        Set<TopicPartition> topicPartitions,
+        LogContext logContext
+    ) {
+        this.topicPartitions = Collections.unmodifiableSet(topicPartitions);
+        this.log = logContext.logger(PartitionLeaderStrategy.class);
+    }
+
+    @Override
+    public RequestScope lookupScope(TopicPartition key) {
+        // Metadata requests can group topic partitions arbitrarily, so they 
can all share
+        // the same request context
+        return SINGLE_REQUEST_SCOPE;
+    }
+
+    @Override
+    public MetadataRequest.Builder buildRequest(Set<TopicPartition> 
partitions) {
+        MetadataRequestData request = new MetadataRequestData();
+        request.setAllowAutoTopicCreation(false);
+        Set<String> topics = 
partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+        for (String topic : topics) {
+            request.topics().add(new 
MetadataRequestData.MetadataRequestTopic().setName(topic));
+        }
+        return new MetadataRequest.Builder(request);
+    }
+
+    private void handleTopicError(
+        String topic,
+        Errors topicError,
+        Set<TopicPartition> requestPartitions,
+        Map<TopicPartition, Throwable> failed
+    ) {
+        switch (topicError) {
+            case UNKNOWN_TOPIC_OR_PARTITION:
+            case LEADER_NOT_AVAILABLE:
+            case BROKER_NOT_AVAILABLE:
+                log.debug("Metadata request for topic {} returned topic-level 
error {}. Will retry",
+                    topic, topicError);
+                break;
+
+            case TOPIC_AUTHORIZATION_FAILED:
+                log.error("Received authorization failure for topic {} in 
`Metadata` response", topic,
+                    topicError.exception());
+                failAllPartitionsForTopic(topic, requestPartitions, failed, tp 
-> new TopicAuthorizationException(
+                    "Failed to fetch metadata for partition " + tp + " due to 
topic authorization failure",
+                    Collections.singleton(topic)));
+                break;
+
+            case INVALID_TOPIC_EXCEPTION:
+                log.error("Received invalid topic error for topic {} in 
`Metadata` response", topic,
+                    topicError.exception());
+                failAllPartitionsForTopic(topic, requestPartitions, failed, tp 
-> new InvalidTopicException(
+                    "Failed to fetch metadata for partition " + tp + " due to 
invalid topic `" + topic + "`",
+                    Collections.singleton(topic)));
+                break;
+
+            default:
+                log.error("Received unexpected error for topic {} in 
`Metadata` response", topic,
+                    topicError.exception());
+                failAllPartitionsForTopic(topic, requestPartitions, failed, tp 
-> topicError.exception(
+                    "Failed to fetch metadata for partition " + tp + " due to 
unexpected error for topic `" + topic + "`"));
+        }
+    }
+
+    private void failAllPartitionsForTopic(
+        String topic,
+        Set<TopicPartition> partitions,
+        Map<TopicPartition, Throwable> failed,
+        Function<TopicPartition, Throwable> exceptionGenerator
+    ) {
+        partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp 
-> {
+            failed.put(tp, exceptionGenerator.apply(tp));
+        });
+    }
+
+    private void handlePartitionError(
+        TopicPartition topicPartition,
+        Errors partitionError,
+        Map<TopicPartition, Throwable> failed
+    ) {
+        switch (partitionError) {
+            case NOT_LEADER_OR_FOLLOWER:
+            case REPLICA_NOT_AVAILABLE:
+            case LEADER_NOT_AVAILABLE:
+            case BROKER_NOT_AVAILABLE:
+            case KAFKA_STORAGE_ERROR:
+                log.debug("Metadata request for partition {} returned 
partition-level error {}. Will retry",
+                    topicPartition, partitionError);
+                break;
+
+            default:
+                log.error("Received unexpected error for partition {} in 
`Metadata` response",
+                    topicPartition, partitionError.exception());
+                failed.put(topicPartition, partitionError.exception(
+                    "Unexpected error during metadata lookup for " + 
topicPartition));
+        }
+    }
+
+    @Override
+    public LookupResult<TopicPartition> handleResponse(
+        Set<TopicPartition> requestPartitions,
+        AbstractResponse abstractResponse
+    ) {
+        MetadataResponse response = (MetadataResponse) abstractResponse;
+        Map<TopicPartition, Throwable> failed = new HashMap<>();
+        Map<TopicPartition, Integer> mapped = new HashMap<>();
+
+        for (MetadataResponse.TopicMetadata topicMetadata : 
response.topicMetadata()) {
+            Errors topicError = topicMetadata.error();
+            if (topicError != Errors.NONE) {
+                handleTopicError(topicMetadata.topic(), topicError, 
requestPartitions, failed);
+                continue;
+            }
+
+            for (MetadataResponse.PartitionMetadata partitionMetadata : 
topicMetadata.partitionMetadata()) {
+                TopicPartition topicPartition = 
partitionMetadata.topicPartition;
+                Errors partitionError = partitionMetadata.error;
+
+
+                if (!topicPartitions.contains(topicPartition)) {

Review comment:
       Good point. I agree.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to