Repository: kafka Updated Branches: refs/heads/trunk 8dbd688b1 -> 9c34df151
KAFKA-3488; Avoid failing of unsent requests in consumer where possible Fail unsent requests only when returning from KafkaConsumer.poll(). Author: Rajini Sivaram <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1183 from rajinisivaram/KAFKA-3488 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c34df15 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c34df15 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c34df15 Branch: refs/heads/trunk Commit: 9c34df1511a769b272893b75ec1ed90d38cc9576 Parents: 8dbd688 Author: Rajini Sivaram <[email protected]> Authored: Thu Apr 7 15:48:50 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 7 15:48:50 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/AbstractCoordinator.java | 2 + .../consumer/internals/ConsumerCoordinator.java | 3 - .../internals/ConsumerNetworkClient.java | 58 +++++++++++------- .../consumer/internals/SendFailedException.java | 27 --------- .../GroupCoordinatorNotAvailableException.java | 1 + .../internals/ConsumerCoordinatorTest.java | 2 +- .../internals/ConsumerNetworkClientTest.java | 63 +++++++++++++++++++- .../clients/consumer/internals/FetcherTest.java | 2 +- .../runtime/distributed/WorkerGroupMember.java | 3 +- .../distributed/WorkerCoordinatorTest.java | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 23 +++---- 12 files changed, 118 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c457c83..5576431 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -584,7 +584,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); - this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); + this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); List<PartitionAssignor> assignors = config.getConfiguredInstances( http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c79d8e7..1e6757e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -521,6 +522,7 @@ public abstract class AbstractCoordinator implements Closeable { protected void coordinatorDead() { if (this.coordinator != null) { log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); + client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE); this.coordinator = null; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index a364987..86b60d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -422,9 +422,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { reschedule(now + interval); - } else if (exception instanceof SendFailedException) { - log.debug("Failed to send automatic offset commit for group {}", groupId); - reschedule(now); } else { log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); reschedule(now + interval); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index b70994d..4119954 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; @@ -40,12 +41,6 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * Higher level consumer access to the network layer with basic support for futures and * task scheduling. NOT thread-safe! - * - * TODO: The current implementation is simplistic in that it provides a facility for queueing requests - * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time - * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to - * understand, but there are opportunities to provide timeout or retry capabilities in the future. - * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior. */ public class ConsumerNetworkClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); @@ -57,17 +52,20 @@ public class ConsumerNetworkClient implements Closeable { private final Metadata metadata; private final Time time; private final long retryBackoffMs; + private final long unsentExpiryMs; // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently volatile private boolean wakeupsEnabled = true; public ConsumerNetworkClient(KafkaClient client, Metadata metadata, Time time, - long retryBackoffMs) { + long retryBackoffMs, + long requestTimeoutMs) { this.client = client; this.metadata = metadata; this.time = time; this.retryBackoffMs = retryBackoffMs; + this.unsentExpiryMs = requestTimeoutMs; } /** @@ -227,8 +225,8 @@ public class ConsumerNetworkClient implements Closeable { // cleared or a connect finished in the poll trySend(now); - // fail all requests that couldn't be sent - failUnsentRequests(); + // fail requests that couldn't be sent if they have expired + failExpiredRequests(now); } /** @@ -274,29 +272,48 @@ public class ConsumerNetworkClient implements Closeable { Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next(); Node node = requestEntry.getKey(); if (client.connectionFailed(node)) { + // Remove entry before invoking request callback to avoid callbacks handling + // coordinator failures traversing the unsent list again. + iterator.remove(); for (ClientRequest request : requestEntry.getValue()) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.onComplete(new ClientResponse(request, now, true, null)); } - iterator.remove(); } } } - private void failUnsentRequests() { - // clear all unsent requests and fail their corresponding futures - for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) { - Iterator<ClientRequest> iterator = requestEntry.getValue().iterator(); - while (iterator.hasNext()) { - ClientRequest request = iterator.next(); - RequestFutureCompletionHandler handler = - (RequestFutureCompletionHandler) request.callback(); - handler.raise(SendFailedException.INSTANCE); + private void failExpiredRequests(long now) { + // clear all expired unsent requests and fail their corresponding futures + Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next(); + Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator(); + while (requestIterator.hasNext()) { + ClientRequest request = requestIterator.next(); + if (request.createdTimeMs() < now - unsentExpiryMs) { + RequestFutureCompletionHandler handler = + (RequestFutureCompletionHandler) request.callback(); + handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); + requestIterator.remove(); + } else + break; + } + if (requestEntry.getValue().isEmpty()) iterator.remove(); + } + } + + protected void failUnsentRequests(Node node, RuntimeException e) { + // clear unsent requests to node and fail their corresponding futures + List<ClientRequest> unsentRequests = unsent.remove(node); + if (unsentRequests != null) { + for (ClientRequest request : unsentRequests) { + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + handler.raise(e); } } - unsent.clear(); } private boolean trySend(long now) { @@ -320,7 +337,6 @@ public class ConsumerNetworkClient implements Closeable { private void clientPoll(long timeout, long now) { client.poll(timeout, now); if (wakeupsEnabled && wakeup.get()) { - failUnsentRequests(); wakeup.set(false); throw new WakeupException(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java deleted file mode 100644 index 3312a2c..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.common.errors.RetriableException; - -/** - * Exception used in {@link ConsumerNetworkClient} to indicate the failure - * to transmit a request to the networking layer. This could be either because - * the client is still connecting to the given host or its send buffer is full. - */ -public class SendFailedException extends RetriableException { - public static final SendFailedException INSTANCE = new SendFailedException(); - - private static final long serialVersionUID = 1L; - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java index c0949e3..554b885 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.errors; * not yet been created. */ public class GroupCoordinatorNotAvailableException extends RetriableException { + public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException(); private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 623e5ef..2189c30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -103,7 +103,7 @@ public class ConsumerCoordinatorTest { this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE); this.metadata.update(cluster, time.milliseconds()); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.defaultOffsetCommitCallback = new MockCommitCallback(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 1692010..f0f2a97 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -27,6 +27,8 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import org.junit.Test; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -40,7 +42,7 @@ public class ConsumerNetworkClientTest { private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); - private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); @Test public void send() { @@ -104,6 +106,65 @@ public class ConsumerNetworkClientTest { assertTrue(future.isDone()); } + @Test + public void sendExpiry() throws InterruptedException { + long unsentExpiryMs = 10; + final AtomicBoolean isReady = new AtomicBoolean(); + final AtomicBoolean disconnected = new AtomicBoolean(); + client = new MockClient(time) { + @Override + public boolean ready(Node node, long now) { + if (isReady.get()) + return super.ready(node, now); + else + return false; + } + @Override + public boolean connectionFailed(Node node) { + return disconnected.get(); + } + }; + // Queue first send, sleep long enough for this to expire and then queue second send + consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs); + RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest()); + assertEquals(1, consumerClient.pendingRequestCount()); + assertEquals(1, consumerClient.pendingRequestCount(node)); + assertFalse(future1.isDone()); + + time.sleep(unsentExpiryMs + 1); + RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest()); + assertEquals(2, consumerClient.pendingRequestCount()); + assertEquals(2, consumerClient.pendingRequestCount(node)); + assertFalse(future2.isDone()); + + // First send should have expired and second send still pending + consumerClient.poll(0); + assertTrue(future1.isDone()); + assertFalse(future1.succeeded()); + assertEquals(1, consumerClient.pendingRequestCount()); + assertEquals(1, consumerClient.pendingRequestCount(node)); + assertFalse(future2.isDone()); + + // Enable send, the un-expired send should succeed on poll + isReady.set(true); + client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + consumerClient.poll(future2); + ClientResponse clientResponse = future2.value(); + HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody()); + assertEquals(Errors.NONE.code(), response.errorCode()); + + // Disable ready flag to delay send and queue another send. Disconnection should remove pending send + isReady.set(false); + RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest()); + assertEquals(1, consumerClient.pendingRequestCount()); + assertEquals(1, consumerClient.pendingRequestCount(node)); + disconnected.set(true); + consumerClient.poll(0); + assertTrue(future3.isDone()); + assertFalse(future3.succeeded()); + assertEquals(0, consumerClient.pendingRequestCount()); + assertEquals(0, consumerClient.pendingRequestCount(node)); + } private HeartbeatRequest heartbeatRequest() { return new HeartbeatRequest("group", 1, "memberId"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 58c3841..9002e81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -86,7 +86,7 @@ public class FetcherTest { private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE); private Metrics metrics = new Metrics(time); private static final double EPSILON = 0.0001; - private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 7294ed4..57028ef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -99,7 +99,8 @@ public class WorkerGroupMember { config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG), config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG), config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time); - this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); + this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, + config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)); this.coordinator = new WorkerCoordinator(this.client, config.getString(DistributedConfig.GROUP_ID_CONFIG), config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG), http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index abb62b9..bf33cb3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -88,7 +88,7 @@ public class WorkerCoordinatorTest { this.client = new MockClient(time); this.metadata = new Metadata(0, Long.MAX_VALUE); this.metadata.update(cluster, time.milliseconds()); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.configStorage = PowerMock.createMock(KafkaConfigStorage.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c34df15/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index b857315..ef76ffc 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -20,7 +20,7 @@ import kafka.common.KafkaException import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} import kafka.utils.Logging import org.apache.kafka.clients._ -import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException} +import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.DisconnectException @@ -43,21 +43,15 @@ class AdminClient(val time: Time, private def send(target: Node, api: ApiKeys, request: AbstractRequest): Struct = { - var now = time.milliseconds() - val deadline = now + requestTimeoutMs var future: RequestFuture[ClientResponse] = null - do { - future = client.send(target, api, request) - client.poll(future) + future = client.send(target, api, request) + client.poll(future) - if (future.succeeded()) - return future.value().responseBody() - - now = time.milliseconds() - } while (now < deadline && future.exception().isInstanceOf[SendFailedException]) - - throw future.exception() + if (future.succeeded()) + return future.value().responseBody() + else + throw future.exception() } private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = { @@ -244,7 +238,8 @@ object AdminClient { networkClient, metadata, time, - DefaultRetryBackoffMs) + DefaultRetryBackoffMs, + DefaultRequestTimeoutMs) new AdminClient( time,
