This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c74b437d0081dbd9b3158ec10c9a54780d57e6a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 8 08:19:07 2019 -0700

    KAFKA-8983; AdminClient deleteRecords should not fail all partitions 
unnecessarily (#7449)
    
    The deleteRecords API in the AdminClient groups records to be sent by the 
partition leaders. If one of these requests fails, we currently fail all 
futures, including those tied to requests sent to other leaders. It would be 
better to fail only those partitions included in the failed request.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 73 ++++++++++----------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 80 +++++++++++++++++++---
 .../test/java/org/apache/kafka/test/TestUtils.java | 17 +++++
 3 files changed, 127 insertions(+), 43 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a7663da..d4958f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -80,9 +80,9 @@ import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
 import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@@ -109,21 +109,21 @@ import 
org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
-import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -144,8 +144,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest;
 import org.apache.kafka.common.requests.ElectLeadersResponse;
 import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
@@ -158,7 +158,6 @@ import 
org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetDeleteRequest;
-import org.apache.kafka.common.requests.OffsetDeleteRequest.Builder;
 import org.apache.kafka.common.requests.OffsetDeleteResponse;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -190,21 +189,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.TreeMap;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
-import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
+import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
-import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static 
org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -315,9 +315,18 @@ public class KafkaAdminClient extends AdminClient {
      * @param <T>       The KafkaFutureImpl result type.
      */
     private static <T> void 
completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) 
{
-        for (KafkaFutureImpl<?> future : futures) {
-            future.completeExceptionally(exc);
-        }
+        completeAllExceptionally(futures.stream(), exc);
+    }
+
+    /**
+     * Send an exception to all futures in the provided stream
+     *
+     * @param futures   The stream of KafkaFutureImpl objects.
+     * @param exc       The exception
+     * @param <T>       The KafkaFutureImpl result type.
+     */
+    private static <T> void 
completeAllExceptionally(Stream<KafkaFutureImpl<T>> futures, Throwable exc) {
+        futures.forEach(future -> future.completeExceptionally(exc));
     }
 
     /**
@@ -2364,39 +2373,31 @@ public class KafkaAdminClient extends AdminClient {
                 Map<String, Errors> errors = response.errors();
                 Cluster cluster = response.cluster();
 
-                // completing futures for topics with errors
-                for (Map.Entry<String, Errors> topicError: errors.entrySet()) {
-
-                    for (Map.Entry<TopicPartition, 
KafkaFutureImpl<DeletedRecords>> future: futures.entrySet()) {
-                        if 
(future.getKey().topic().equals(topicError.getKey())) {
-                            
future.getValue().completeExceptionally(topicError.getValue().exception());
-                        }
-                    }
-                }
-
-                // grouping topic partitions per leader
+                // Group topic partitions by leader
                 Map<Node, Map<TopicPartition, Long>> leaders = new HashMap<>();
                 for (Map.Entry<TopicPartition, RecordsToDelete> entry: 
recordsToDelete.entrySet()) {
+                    KafkaFutureImpl<DeletedRecords> future = 
futures.get(entry.getKey());
 
-                    // avoiding to send deletion request for topics with errors
-                    if (!errors.containsKey(entry.getKey().topic())) {
-
+                    // Fail partitions with topic errors
+                    Errors topicError = errors.get(entry.getKey().topic());
+                    if (errors.containsKey(entry.getKey().topic())) {
+                        future.completeExceptionally(topicError.exception());
+                    } else {
                         Node node = cluster.leaderFor(entry.getKey());
                         if (node != null) {
                             if (!leaders.containsKey(node))
                                 leaders.put(node, new HashMap<>());
                             leaders.get(node).put(entry.getKey(), 
entry.getValue().beforeOffset());
                         } else {
-                            KafkaFutureImpl<DeletedRecords> future = 
futures.get(entry.getKey());
                             
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
                         }
                     }
                 }
 
-                for (final Map.Entry<Node, Map<TopicPartition, Long>> entry: 
leaders.entrySet()) {
-
-                    final long nowDelete = time.milliseconds();
+                final long deleteRecordsCallTimeMs = time.milliseconds();
 
+                for (final Map.Entry<Node, Map<TopicPartition, Long>> entry : 
leaders.entrySet()) {
+                    final Map<TopicPartition, Long> partitionDeleteOffsets = 
entry.getValue();
                     final int brokerId = entry.getKey().id();
 
                     runnable.call(new Call("deleteRecords", deadline,
@@ -2404,7 +2405,7 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new DeleteRecordsRequest.Builder(timeoutMs, 
entry.getValue());
+                            return new DeleteRecordsRequest.Builder(timeoutMs, 
partitionDeleteOffsets);
                         }
 
                         @Override
@@ -2423,9 +2424,11 @@ public class KafkaAdminClient extends AdminClient {
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(futures.values(), 
throwable);
+                            Stream<KafkaFutureImpl<DeletedRecords>> 
callFutures =
+                                    
partitionDeleteOffsets.keySet().stream().map(futures::get);
+                            completeAllExceptionally(callFutures, throwable);
                         }
-                    }, nowDelete);
+                    }, deleteRecordsCallTimeMs);
                 }
             }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b6fece9..6d870fc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.clients.admin;
 
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
@@ -52,13 +50,14 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
-import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
@@ -68,8 +67,8 @@ import 
org.apache.kafka.common.message.DescribeGroupsResponseData;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
@@ -83,14 +82,14 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -140,18 +139,20 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
-import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
+import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 /**
@@ -882,6 +883,69 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testDeleteRecordsTopicAuthorizationError() {
+        String topic = "foo";
+        TopicPartition partition = new TopicPartition(topic, 0);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            List<MetadataResponse.TopicMetadata> topics = new ArrayList<>();
+            topics.add(new 
MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false,
+                    Collections.emptyList()));
+
+            
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(), 
env.cluster().controller().id(), topics));
+
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>();
+            recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L));
+            DeleteRecordsResult results = 
env.adminClient().deleteRecords(recordsToDelete);
+
+            
TestUtils.assertFutureThrows(results.lowWatermarks().get(partition), 
TopicAuthorizationException.class);
+        }
+    }
+
+    @Test
+    public void testDeleteRecordsMultipleSends() throws Exception {
+        String topic = "foo";
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        TopicPartition tp1 = new TopicPartition(topic, 1);
+
+        Cluster cluster = mockCluster(0);
+        MockTime time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster)) {
+            List<Node> nodes = cluster.nodes();
+
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new 
ArrayList<>();
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
+                    Optional.of(5), singletonList(nodes.get(0)), 
singletonList(nodes.get(0)),
+                    Collections.emptyList()));
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
+                    Optional.of(5), singletonList(nodes.get(1)), 
singletonList(nodes.get(1)), Collections.emptyList()));
+
+            List<MetadataResponse.TopicMetadata> topicMetadata = new 
ArrayList<>();
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
topic, false, partitionMetadata));
+
+            
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(),
+                    cluster.clusterResource().clusterId(), 
cluster.controller().id(), topicMetadata));
+
+            Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> 
deletedPartitions = new HashMap<>();
+            deletedPartitions.put(tp0, new 
DeleteRecordsResponse.PartitionResponse(3, Errors.NONE));
+            env.kafkaClient().prepareResponseFrom(new DeleteRecordsResponse(0, 
deletedPartitions), nodes.get(0));
+
+            env.kafkaClient().disconnect(nodes.get(1).idString());
+            env.kafkaClient().createPendingAuthenticationError(nodes.get(1), 
100);
+
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>();
+            recordsToDelete.put(tp0, RecordsToDelete.beforeOffset(10L));
+            recordsToDelete.put(tp1, RecordsToDelete.beforeOffset(10L));
+            DeleteRecordsResult results = 
env.adminClient().deleteRecords(recordsToDelete);
+
+            assertEquals(3L, 
results.lowWatermarks().get(tp0).get().lowWatermark());
+            TestUtils.assertFutureThrows(results.lowWatermarks().get(tp1), 
AuthenticationException.class);
+        }
+    }
+
+    @Test
     public void testDeleteRecords() throws Exception {
 
         HashMap<Integer, Node> nodes = new HashMap<>();
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 5858a16..2183e15 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -64,6 +64,7 @@ import java.util.regex.Pattern;
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -457,6 +458,22 @@ public class TestUtils {
         return tps;
     }
 
+    /**
+     * Assert that a future raises an expected exception cause type. Return 
the exception cause
+     * if the assertion succeeds; otherwise raise AssertionError.
+     *
+     * @param future The future to await
+     * @param exceptionCauseClass Class of the expected exception cause
+     * @param <T> Exception cause type parameter
+     * @return The caught exception cause
+     */
+    public static <T extends Throwable> T assertFutureThrows(Future<?> future, 
Class<T> exceptionCauseClass) {
+        ExecutionException exception = assertThrows(ExecutionException.class, 
future::get);
+        assertTrue("Unexpected exception cause " + exception.getCause(),
+                exceptionCauseClass.isInstance(exception.getCause()));
+        return exceptionCauseClass.cast(exception.getCause());
+    }
+
     public static void assertFutureError(Future<?> future, Class<? extends 
Throwable> exceptionClass)
         throws InterruptedException {
         try {

Reply via email to