This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c74b437d0081dbd9b3158ec10c9a54780d57e6a5 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Tue Oct 8 08:19:07 2019 -0700 KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (#7449) The deleteRecords API in the AdminClient groups records to be sent by the partition leaders. If one of these requests fails, we currently fail all futures, including those tied to requests sent to other leaders. It would be better to fail only those partitions included in the failed request. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/clients/admin/KafkaAdminClient.java | 73 ++++++++++---------- .../kafka/clients/admin/KafkaAdminClientTest.java | 80 +++++++++++++++++++--- .../test/java/org/apache/kafka/test/TestUtils.java | 17 +++++ 3 files changed, 127 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index a7663da..d4958f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -80,9 +80,9 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection; -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; @@ -109,21 +109,21 @@ import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsRequest; -import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse; +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; -import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsRequest; +import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; -import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsRequest; @@ -144,8 +144,8 @@ import org.apache.kafka.common.requests.ElectLeadersRequest; import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; -import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; @@ -158,7 +158,6 @@ import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.OffsetDeleteRequest; -import org.apache.kafka.common.requests.OffsetDeleteRequest.Builder; import org.apache.kafka.common.requests.OffsetDeleteResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; @@ -190,21 +189,22 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.TreeMap; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition; -import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; +import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; import static org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics; -import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -315,9 +315,18 @@ public class KafkaAdminClient extends AdminClient { * @param <T> The KafkaFutureImpl result type. */ private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) { - for (KafkaFutureImpl<?> future : futures) { - future.completeExceptionally(exc); - } + completeAllExceptionally(futures.stream(), exc); + } + + /** + * Send an exception to all futures in the provided stream + * + * @param futures The stream of KafkaFutureImpl objects. + * @param exc The exception + * @param <T> The KafkaFutureImpl result type. + */ + private static <T> void completeAllExceptionally(Stream<KafkaFutureImpl<T>> futures, Throwable exc) { + futures.forEach(future -> future.completeExceptionally(exc)); } /** @@ -2364,39 +2373,31 @@ public class KafkaAdminClient extends AdminClient { Map<String, Errors> errors = response.errors(); Cluster cluster = response.cluster(); - // completing futures for topics with errors - for (Map.Entry<String, Errors> topicError: errors.entrySet()) { - - for (Map.Entry<TopicPartition, KafkaFutureImpl<DeletedRecords>> future: futures.entrySet()) { - if (future.getKey().topic().equals(topicError.getKey())) { - future.getValue().completeExceptionally(topicError.getValue().exception()); - } - } - } - - // grouping topic partitions per leader + // Group topic partitions by leader Map<Node, Map<TopicPartition, Long>> leaders = new HashMap<>(); for (Map.Entry<TopicPartition, RecordsToDelete> entry: recordsToDelete.entrySet()) { + KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey()); - // avoiding to send deletion request for topics with errors - if (!errors.containsKey(entry.getKey().topic())) { - + // Fail partitions with topic errors + Errors topicError = errors.get(entry.getKey().topic()); + if (errors.containsKey(entry.getKey().topic())) { + future.completeExceptionally(topicError.exception()); + } else { Node node = cluster.leaderFor(entry.getKey()); if (node != null) { if (!leaders.containsKey(node)) leaders.put(node, new HashMap<>()); leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset()); } else { - KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey()); future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception()); } } } - for (final Map.Entry<Node, Map<TopicPartition, Long>> entry: leaders.entrySet()) { - - final long nowDelete = time.milliseconds(); + final long deleteRecordsCallTimeMs = time.milliseconds(); + for (final Map.Entry<Node, Map<TopicPartition, Long>> entry : leaders.entrySet()) { + final Map<TopicPartition, Long> partitionDeleteOffsets = entry.getValue(); final int brokerId = entry.getKey().id(); runnable.call(new Call("deleteRecords", deadline, @@ -2404,7 +2405,7 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new DeleteRecordsRequest.Builder(timeoutMs, entry.getValue()); + return new DeleteRecordsRequest.Builder(timeoutMs, partitionDeleteOffsets); } @Override @@ -2423,9 +2424,11 @@ public class KafkaAdminClient extends AdminClient { @Override void handleFailure(Throwable throwable) { - completeAllExceptionally(futures.values(), throwable); + Stream<KafkaFutureImpl<DeletedRecords>> callFutures = + partitionDeleteOffsets.keySet().stream().map(futures::get); + completeAllExceptionally(callFutures, throwable); } - }, nowDelete); + }, deleteRecordsCallTimeMs); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b6fece9..6d870fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.clients.admin; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.MockClient; @@ -52,13 +50,14 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; -import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection; @@ -68,8 +67,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; -import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; @@ -83,14 +82,14 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateAclsResponse; +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; -import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsRequest; @@ -140,18 +139,20 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse; import static org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse; -import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment; +import static org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; /** @@ -882,6 +883,69 @@ public class KafkaAdminClientTest { } @Test + public void testDeleteRecordsTopicAuthorizationError() { + String topic = "foo"; + TopicPartition partition = new TopicPartition(topic, 0); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + List<MetadataResponse.TopicMetadata> topics = new ArrayList<>(); + topics.add(new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, + Collections.emptyList())); + + env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), env.cluster().controller().id(), topics)); + + Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>(); + recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L)); + DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); + + TestUtils.assertFutureThrows(results.lowWatermarks().get(partition), TopicAuthorizationException.class); + } + } + + @Test + public void testDeleteRecordsMultipleSends() throws Exception { + String topic = "foo"; + TopicPartition tp0 = new TopicPartition(topic, 0); + TopicPartition tp1 = new TopicPartition(topic, 1); + + Cluster cluster = mockCluster(0); + MockTime time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + List<Node> nodes = cluster.nodes(); + + List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(); + partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0), + Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)), + Collections.emptyList())); + partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1), + Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)), Collections.emptyList())); + + List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>(); + topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false, partitionMetadata)); + + env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(), + cluster.clusterResource().clusterId(), cluster.controller().id(), topicMetadata)); + + Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> deletedPartitions = new HashMap<>(); + deletedPartitions.put(tp0, new DeleteRecordsResponse.PartitionResponse(3, Errors.NONE)); + env.kafkaClient().prepareResponseFrom(new DeleteRecordsResponse(0, deletedPartitions), nodes.get(0)); + + env.kafkaClient().disconnect(nodes.get(1).idString()); + env.kafkaClient().createPendingAuthenticationError(nodes.get(1), 100); + + Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>(); + recordsToDelete.put(tp0, RecordsToDelete.beforeOffset(10L)); + recordsToDelete.put(tp1, RecordsToDelete.beforeOffset(10L)); + DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); + + assertEquals(3L, results.lowWatermarks().get(tp0).get().lowWatermark()); + TestUtils.assertFutureThrows(results.lowWatermarks().get(tp1), AuthenticationException.class); + } + } + + @Test public void testDeleteRecords() throws Exception { HashMap<Integer, Node> nodes = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 5858a16..2183e15 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -64,6 +64,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -457,6 +458,22 @@ public class TestUtils { return tps; } + /** + * Assert that a future raises an expected exception cause type. Return the exception cause + * if the assertion succeeds; otherwise raise AssertionError. + * + * @param future The future to await + * @param exceptionCauseClass Class of the expected exception cause + * @param <T> Exception cause type parameter + * @return The caught exception cause + */ + public static <T extends Throwable> T assertFutureThrows(Future<?> future, Class<T> exceptionCauseClass) { + ExecutionException exception = assertThrows(ExecutionException.class, future::get); + assertTrue("Unexpected exception cause " + exception.getCause(), + exceptionCauseClass.isInstance(exception.getCause())); + return exceptionCauseClass.cast(exception.getCause()); + } + public static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass) throws InterruptedException { try {