[ https://issues.apache.org/jira/browse/KAFKA-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511784#comment-16511784 ]
ASF GitHub Bot commented on KAFKA-7050: --------------------------------------- hachikuji closed pull request #5203: KAFKA-7050: Decrease default consumer request timeout to 30s URL: https://github.com/apache/kafka/pull/5203 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 9b62946fb96..7b44ca3ad3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -31,6 +31,7 @@ private final String clientId; private final long createdTimeMs; private final boolean expectResponse; + private final int requestTimeoutMs; private final RequestCompletionHandler callback; /** @@ -48,6 +49,7 @@ public ClientRequest(String destination, String clientId, long createdTimeMs, boolean expectResponse, + int requestTimeoutMs, RequestCompletionHandler callback) { this.destination = destination; this.requestBuilder = requestBuilder; @@ -55,6 +57,7 @@ public ClientRequest(String destination, this.clientId = clientId; this.createdTimeMs = createdTimeMs; this.expectResponse = expectResponse; + this.requestTimeoutMs = requestTimeoutMs; this.callback = callback; } @@ -101,4 +104,8 @@ public long createdTimeMs() { public int correlationId() { return correlationId; } + + public int requestTimeoutMs() { + return requestTimeoutMs; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 5caee2d4c87..5b7ba611714 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -17,11 +17,11 @@ package org.apache.kafka.clients; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -162,25 +162,28 @@ public boolean isEmpty() { } } + private Boolean hasExpiredRequest(long now, Deque<NetworkClient.InFlightRequest> deque) { + for (NetworkClient.InFlightRequest request : deque) { + long timeSinceSend = Math.max(0, now - request.sendTimeMs); + if (timeSinceSend > request.requestTimeoutMs) + return true; + } + return false; + } + /** * Returns a list of nodes with pending in-flight request, that need to be timed out * * @param now current time in milliseconds - * @param requestTimeoutMs max time to wait for the request to be completed * @return list of nodes */ - public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs) { - List<String> nodeIds = new LinkedList<>(); + public List<String> nodesWithTimedOutRequests(long now) { + List<String> nodeIds = new ArrayList<>(); for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) { String nodeId = requestEntry.getKey(); Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue(); - - if (!deque.isEmpty()) { - NetworkClient.InFlightRequest request = deque.peekLast(); - long timeSinceSend = now - request.sendTimeMs; - if (timeSinceSend > requestTimeoutMs) - nodeIds.add(nodeId); - } + if (hasExpiredRequest(now, deque)) + nodeIds.add(nodeId); } return nodeIds; } diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 49bf3a3eab9..448932e358b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -185,9 +185,16 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request * @param requestBuilder the request builder to use * @param createdTimeMs the time in milliseconds to use as the creation time of the request * @param expectResponse true iff we expect a response + * @param requestTimeoutMs Upper bound time in milliseconds to await a response before disconnecting the socket and + * cancelling the request. The request may get cancelled sooner if the socket disconnects + * for any reason including if another pending request to the same node timed out first. * @param callback the callback to invoke when we get a response */ - ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, - boolean expectResponse, RequestCompletionHandler callback); + ClientRequest newClientRequest(String nodeId, + AbstractRequest.Builder<?> requestBuilder, + long createdTimeMs, + boolean expectResponse, + int requestTimeoutMs, + RequestCompletionHandler callback); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 7c87277eb8f..619f7bdbdac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -88,8 +88,8 @@ /* the current correlation id to use when sending requests to servers */ private int correlation; - /* max time in ms for the producer to wait for acknowledgement from server*/ - private final int requestTimeoutMs; + /* default timeout for individual requests to await acknowledgement from servers */ + private final int defaultRequestTimeoutMs; /* time in ms to wait before retrying to create connection to a server */ private final long reconnectBackoffMs; @@ -117,15 +117,26 @@ public NetworkClient(Selectable selector, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs, + int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext) { - this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, - reconnectBackoffMs, reconnectBackoffMax, - socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, - discoverBrokerVersions, apiVersions, null, logContext); + this(null, + metadata, + selector, + clientId, + maxInFlightRequestsPerConnection, + reconnectBackoffMs, + reconnectBackoffMax, + socketSendBuffer, + socketReceiveBuffer, + defaultRequestTimeoutMs, + time, + discoverBrokerVersions, + apiVersions, + null, + logContext); } public NetworkClient(Selectable selector, @@ -136,16 +147,27 @@ public NetworkClient(Selectable selector, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs, + int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, Sensor throttleTimeSensor, LogContext logContext) { - this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, - reconnectBackoffMs, reconnectBackoffMax, - socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, - discoverBrokerVersions, apiVersions, throttleTimeSensor, logContext); + this(null, + metadata, + selector, + clientId, + maxInFlightRequestsPerConnection, + reconnectBackoffMs, + reconnectBackoffMax, + socketSendBuffer, + socketReceiveBuffer, + defaultRequestTimeoutMs, + time, + discoverBrokerVersions, + apiVersions, + throttleTimeSensor, + logContext); } public NetworkClient(Selectable selector, @@ -156,15 +178,26 @@ public NetworkClient(Selectable selector, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs, + int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, LogContext logContext) { - this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, - reconnectBackoffMs, reconnectBackoffMax, - socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, - discoverBrokerVersions, apiVersions, null, logContext); + this(metadataUpdater, + null, + selector, + clientId, + maxInFlightRequestsPerConnection, + reconnectBackoffMs, + reconnectBackoffMax, + socketSendBuffer, + socketReceiveBuffer, + defaultRequestTimeoutMs, + time, + discoverBrokerVersions, + apiVersions, + null, + logContext); } private NetworkClient(MetadataUpdater metadataUpdater, @@ -176,7 +209,7 @@ private NetworkClient(MetadataUpdater metadataUpdater, long reconnectBackoffMax, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs, + int defaultRequestTimeoutMs, Time time, boolean discoverBrokerVersions, ApiVersions apiVersions, @@ -201,7 +234,7 @@ private NetworkClient(MetadataUpdater metadataUpdater, this.socketReceiveBuffer = socketReceiveBuffer; this.correlation = 0; this.randOffset = new Random(); - this.requestTimeoutMs = requestTimeoutMs; + this.defaultRequestTimeoutMs = defaultRequestTimeoutMs; this.reconnectBackoffMs = reconnectBackoffMs; this.time = time; this.discoverBrokerVersions = discoverBrokerVersions; @@ -426,31 +459,28 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long } private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { - String nodeId = clientRequest.destination(); + String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { int latestClientVersion = clientRequest.apiKey().latestVersion(); if (header.apiVersion() == latestClientVersion) { log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, - clientRequest.correlationId(), nodeId); + clientRequest.correlationId(), destination); } else { log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}", - header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId); + header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination); } } - Send send = request.toSend(nodeId, header); + Send send = request.toSend(destination, header); InFlightRequest inFlightRequest = new InFlightRequest( + clientRequest, header, - clientRequest.createdTimeMs(), - clientRequest.destination(), - clientRequest.callback(), - clientRequest.expectResponse(), isInternalRequest, request, send, now); this.inFlightRequests.add(inFlightRequest); - selector.send(inFlightRequest.send); + selector.send(send); } /** @@ -475,7 +505,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long long metadataTimeout = metadataUpdater.maybeUpdate(now); try { - this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); + this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } @@ -658,7 +688,7 @@ else if (request.header.apiKey() == ApiKeys.METADATA) * @param now The current time */ private void handleTimedOutRequests(List<ClientResponse> responses, long now) { - List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs); + List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now); for (String nodeId : nodeIds) { // close connection to the node this.selector.close(nodeId); @@ -868,7 +898,7 @@ public boolean isUpdateDue(long now) { public long maybeUpdate(long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0; + long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0; long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); @@ -965,7 +995,7 @@ private long maybeUpdate(long now, Node node) { log.debug("Sending metadata request {} to node {}", metadataRequest, node); sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); - return requestTimeoutMs; + return defaultRequestTimeoutMs; } // If there's any connection establishment underway, wait until it completes. This prevents @@ -993,16 +1023,26 @@ private long maybeUpdate(long now, Node node) { } @Override - public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, + public ClientRequest newClientRequest(String nodeId, + AbstractRequest.Builder<?> requestBuilder, + long createdTimeMs, boolean expectResponse) { - return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null); + return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null); } @Override - public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, - boolean expectResponse, RequestCompletionHandler callback) { + public ClientRequest newClientRequest(String nodeId, + AbstractRequest.Builder<?> requestBuilder, + long createdTimeMs, + boolean expectResponse, + int requestTimeoutMs, + RequestCompletionHandler callback) { return new ClientRequest(nodeId, requestBuilder, correlation++, clientId, createdTimeMs, expectResponse, - callback); + defaultRequestTimeoutMs, callback); + } + + public boolean discoverBrokerVersions() { + return discoverBrokerVersions; } static class InFlightRequest { @@ -1015,8 +1055,28 @@ public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> final Send send; final long sendTimeMs; final long createdTimeMs; + final long requestTimeoutMs; + + public InFlightRequest(ClientRequest clientRequest, + RequestHeader header, + boolean isInternalRequest, + AbstractRequest request, + Send send, + long sendTimeMs) { + this(header, + clientRequest.requestTimeoutMs(), + clientRequest.createdTimeMs(), + clientRequest.destination(), + clientRequest.callback(), + clientRequest.expectResponse(), + isInternalRequest, + request, + send, + sendTimeMs); + } public InFlightRequest(RequestHeader header, + int requestTimeoutMs, long createdTimeMs, String destination, RequestCompletionHandler callback, @@ -1026,6 +1086,8 @@ public InFlightRequest(RequestHeader header, Send send, long sendTimeMs) { this.header = header; + this.requestTimeoutMs = requestTimeoutMs; + this.createdTimeMs = createdTimeMs; this.destination = destination; this.callback = callback; this.expectResponse = expectResponse; @@ -1033,7 +1095,6 @@ public InFlightRequest(RequestHeader header, this.request = request; this.send = send; this.sendTimeMs = sendTimeMs; - this.createdTimeMs = createdTimeMs; } public ClientResponse completed(AbstractResponse response, long timeMs) { @@ -1060,7 +1121,4 @@ public String toString() { } } - public boolean discoverBrokerVersions() { - return discoverBrokerVersions; - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bc9a716158e..fefeae343e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -403,7 +403,7 @@ VALUE_DESERIALIZER_CLASS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, - 305000, // chosen to be higher than the default of max.poll.interval.ms + 30000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d6973c0a818..76e0fcc9ba6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -668,10 +668,11 @@ private KafkaConsumer(ConsumerConfig config, log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); - int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); - int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); - if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) - throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); + int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); + if (this.requestTimeoutMs < sessionTimeoutMs) + throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " (" + requestTimeoutMs + + ") cannot be lower than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + + " (" + sessionTimeoutMs + ")"); this.time = Time.SYSTEM; Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index adbaae776ab..8f681380567 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -319,7 +319,11 @@ public void ensureActiveGroup() { * @return true iff the group is active */ boolean ensureActiveGroup(final long timeoutMs) { - final long startTime = time.milliseconds(); + return ensureActiveGroup(timeoutMs, time.milliseconds()); + } + + // Visible for testing + boolean ensureActiveGroup(long timeoutMs, long startMs) { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. if (!ensureCoordinatorReady(timeoutMs)) { @@ -328,7 +332,9 @@ boolean ensureActiveGroup(final long timeoutMs) { startHeartbeatThreadIfNeeded(); - return joinGroupIfNeeded(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startTime)); + long joinStartMs = time.milliseconds(); + long joinTimeoutMs = remainingTimeAtLeastZero(timeoutMs, joinStartMs - startMs); + return joinGroupIfNeeded(joinTimeoutMs, joinStartMs); } private synchronized void startHeartbeatThreadIfNeeded() { @@ -366,17 +372,17 @@ private void closeHeartbeatThread() { * Visible for testing. * * @param timeoutMs Time to complete this action + * @param startTimeMs Current time when invoked * @return true iff the operation succeeded */ - boolean joinGroupIfNeeded(final long timeoutMs) { - final long startTime = time.milliseconds(); + boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) { long elapsedTime = 0L; while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { return false; } - elapsedTime = time.milliseconds() - startTime; + elapsedTime = time.milliseconds() - startTimeMs; // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called @@ -415,7 +421,7 @@ else if (!future.isRetriable()) } if (rejoinNeededOrPending()) { - elapsedTime = time.milliseconds() - startTime; + elapsedTime = time.milliseconds() - startTimeMs; } } return true; @@ -473,9 +479,12 @@ public void onFailure(RuntimeException e) { * Join the group and return the assignment for the next generation. This function handles both * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if * elected leader by the coordinator. + * + * NOTE: This is visible only for testing + * * @return A request future which wraps the assignment returned from the group leader */ - private RequestFuture<ByteBuffer> sendJoinGroupRequest() { + RequestFuture<ByteBuffer> sendJoinGroupRequest() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); @@ -489,7 +498,12 @@ public void onFailure(RuntimeException e) { metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs); log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator); - return client.send(coordinator, requestBuilder) + + // Note that we override the request timeout using the rebalance timeout since that is the + // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. + + int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000); + return client.send(coordinator, requestBuilder, joinGroupTimeoutMs) .compose(new JoinGroupResponseHandler()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index a9e167a9a2e..0bf0aad1b94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -62,7 +62,7 @@ private final Time time; private final long retryBackoffMs; private final int maxPollTimeoutMs; - private final long unsentExpiryMs; + private final int requestTimeoutMs; private final AtomicBoolean wakeupDisabled = new AtomicBoolean(); // We do not need high throughput, so use a fair lock to try to avoid starvation @@ -83,7 +83,7 @@ public ConsumerNetworkClient(LogContext logContext, Metadata metadata, Time time, long retryBackoffMs, - long requestTimeoutMs, + int requestTimeoutMs, int maxPollTimeoutMs) { this.log = logContext.logger(ConsumerNetworkClient.class); this.client = client; @@ -91,7 +91,15 @@ public ConsumerNetworkClient(LogContext logContext, this.time = time; this.retryBackoffMs = retryBackoffMs; this.maxPollTimeoutMs = Math.min(maxPollTimeoutMs, MAX_POLL_TIMEOUT_MS); - this.unsentExpiryMs = requestTimeoutMs; + this.requestTimeoutMs = requestTimeoutMs; + } + + + /** + * Send a request with the default timeout. See {@link #send(Node, AbstractRequest.Builder, int)}. + */ + public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) { + return send(node, requestBuilder, requestTimeoutMs); } /** @@ -104,13 +112,18 @@ public ConsumerNetworkClient(LogContext logContext, * * @param node The destination of the request * @param requestBuilder A builder for the request payload + * @param requestTimeoutMs Maximum time in milliseconds to await a response before disconnecting the socket and + * cancelling the request. The request may be cancelled sooner if the socket disconnects + * for any reason. * @return A future which indicates the result of the send. */ - public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) { + public RequestFuture<ClientResponse> send(Node node, + AbstractRequest.Builder<?> requestBuilder, + int requestTimeoutMs) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, - completionHandler); + requestTimeoutMs, completionHandler); unsent.put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request @@ -136,13 +149,6 @@ public boolean hasReadyNodes(long now) { } } - /** - * Block until the metadata has been refreshed. - */ - public void awaitMetadataUpdate() { - awaitMetadataUpdate(Long.MAX_VALUE); - } - /** * Block waiting on the metadata refresh with a timeout. * @@ -444,10 +450,10 @@ public void disconnectAsync(Node node) { private void failExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures - Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs); + Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now); for (ClientRequest request : expiredRequests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); - handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); + handler.onFailure(new TimeoutException("Failed to send request after " + request.requestTimeoutMs() + " ms.")); } } @@ -655,13 +661,14 @@ public boolean hasRequests() { return false; } - public Collection<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) { + private Collection<ClientRequest> removeExpiredRequests(long now) { List<ClientRequest> expiredRequests = new ArrayList<>(); for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) { Iterator<ClientRequest> requestIterator = requests.iterator(); while (requestIterator.hasNext()) { ClientRequest request = requestIterator.next(); - if (request.createdTimeMs() < now - unsentExpiryMs) { + long elapsedMs = Math.max(0, now - request.createdTimeMs()); + if (elapsedMs > request.requestTimeoutMs()) { expiredRequests.add(request); requestIterator.remove(); } else diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7c941796ce4..5cb904691ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -109,7 +109,7 @@ private final SenderMetrics sensors; /* the max time to wait for the server to respond to the request*/ - private final int requestTimeout; + private final int requestTimeoutMs; /* The max time to wait before retrying a request which has failed */ private final long retryBackoffMs; @@ -130,7 +130,7 @@ public Sender(LogContext logContext, int retries, SenderMetricsRegistry metricsRegistry, Time time, - int requestTimeout, + int requestTimeoutMs, long retryBackoffMs, TransactionManager transactionManager, ApiVersions apiVersions) { @@ -145,7 +145,7 @@ public Sender(LogContext logContext, this.retries = retries; this.time = time; this.sensors = new SenderMetrics(metricsRegistry); - this.requestTimeout = requestTimeout; + this.requestTimeoutMs = requestTimeoutMs; this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; this.transactionManager = transactionManager; @@ -280,7 +280,7 @@ private long sendProducerData(long now) { } } - List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); + List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now); // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // we need to reset the producer id here. @@ -342,12 +342,12 @@ private boolean maybeSendTransactionalRequest(long now) { break; } - if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) { + if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeoutMs)) { transactionManager.lookupCoordinator(nextRequestHandler); break; } } else { - targetNode = awaitLeastLoadedNodeReady(requestTimeout); + targetNode = awaitLeastLoadedNodeReady(requestTimeoutMs); } if (targetNode != null) { @@ -355,7 +355,7 @@ private boolean maybeSendTransactionalRequest(long now) { time.sleep(nextRequestHandler.retryBackoffMs()); ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), - requestBuilder, now, true, nextRequestHandler); + requestBuilder, now, true, requestTimeoutMs, nextRequestHandler); transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId()); log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode); @@ -409,7 +409,7 @@ public void forceClose() { private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException { String nodeId = node.idString(); InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null); - ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null); + ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null); return NetworkClientUtils.sendAndReceive(client, request, time); } @@ -424,7 +424,7 @@ private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException private void maybeWaitForProducerId() { while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { try { - Node node = awaitLeastLoadedNodeReady(requestTimeout); + Node node = awaitLeastLoadedNodeReady(requestTimeoutMs); if (node != null) { ClientResponse response = sendAndAwaitInitProducerIdRequest(node); InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody(); @@ -652,7 +652,7 @@ private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse */ private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) { for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet()) - sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()); + sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); } /** @@ -702,7 +702,8 @@ public void onComplete(ClientResponse response) { }; String nodeId = Integer.toString(destination); - ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); + ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, + requestTimeoutMs, callback); client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); } diff --git a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java index 600e5dc9053..c7b9eb903c8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java @@ -19,10 +19,14 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.junit.Assert.assertEquals; @@ -64,6 +68,24 @@ public void testClearAll() { assertEquals(correlationId2, clearedRequests.get(1).header.correlationId()); } + @Test + public void testTimedOutNodes() { + Time time = new MockTime(); + + addRequest("A", time.milliseconds(), 50); + addRequest("B", time.milliseconds(), 200); + addRequest("B", time.milliseconds(), 100); + + time.sleep(50); + assertEquals(Collections.emptyList(), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds())); + + time.sleep(25); + assertEquals(Collections.singletonList("A"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds())); + + time.sleep(50); + assertEquals(Arrays.asList("A", "B"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds())); + } + @Test public void testCompleteNext() { int correlationId1 = addRequest(dest); @@ -88,12 +110,16 @@ public void testCompleteLastSentThrowsIfNoInFlights() { } private int addRequest(String destination) { + return addRequest(destination, 0, 10000); + } + + private int addRequest(String destination, long sendTimeMs, int requestTimeoutMs) { int correlationId = this.correlationId; this.correlationId += 1; RequestHeader requestHeader = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", correlationId); - NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, 0, - destination, null, false, false, null, null, 0); + NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, requestTimeoutMs, 0, + destination, null, false, false, null, null, sendTimeMs); inFlightRequests.add(ifr); return correlationId; } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 2a1e21375b9..e82b0ddd67b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -83,6 +83,8 @@ public FutureResponse(Node node, private final TransientSet<Node> blackedOut; // Nodes which will always fail to connect, but can be chosen by leastLoadedNode private final TransientSet<Node> unreachable; + // Nodes which have a delay before ultimately succeeding to connect + private final TransientSet<Node> delayedReady; private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>(); private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>(); @@ -105,6 +107,7 @@ public MockClient(Time time, Metadata metadata) { this.unavailableTopics = Collections.emptySet(); this.blackedOut = new TransientSet<>(time); this.unreachable = new TransientSet<>(time); + this.delayedReady = new TransientSet<>(time); } @Override @@ -122,6 +125,9 @@ public boolean ready(Node node, long now) { return false; } + if (delayedReady.contains(node, now)) + return false; + ready.add(node.idString()); return true; } @@ -145,6 +151,10 @@ public void setUnreachable(Node node, long durationMs) { unreachable.add(node, durationMs); } + public void delayReady(Node node, long durationMs) { + delayedReady.add(node, durationMs); + } + public void authenticationFailed(Node node, long blackoutMs) { pendingAuthenticationErrors.remove(node); authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception()); @@ -267,6 +277,7 @@ private synchronized void maybeAwaitWakeup() { @Override public List<ClientResponse> poll(long timeoutMs, long now) { maybeAwaitWakeup(); + checkTimeoutOfPendingRequests(now); List<ClientResponse> copy = new ArrayList<>(this.responses); @@ -296,6 +307,19 @@ private synchronized void maybeAwaitWakeup() { return copy; } + private long elapsedTimeMs(long currentTimeMs, long startTimeMs) { + return Math.max(0, currentTimeMs - startTimeMs); + } + + private void checkTimeoutOfPendingRequests(long nowMs) { + ClientRequest request = requests.peek(); + while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) { + disconnect(request.destination()); + requests.poll(); + request = requests.peek(); + } + } + public Queue<ClientRequest> requests() { return this.requests; } @@ -493,14 +517,18 @@ public boolean hasReadyNodes(long now) { @Override public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse) { - return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null); + return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null); } @Override - public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, - boolean expectResponse, RequestCompletionHandler callback) { + public ClientRequest newClientRequest(String nodeId, + AbstractRequest.Builder<?> requestBuilder, + long createdTimeMs, + boolean expectResponse, + int requestTimeoutMs, + RequestCompletionHandler callback) { return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs, - expectResponse, callback); + expectResponse, requestTimeoutMs, callback); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index f83226c42ac..e13fcefaec9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -47,11 +47,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class NetworkClientTest { - protected final int requestTimeoutMs = 1000; + protected final int minRequestTimeoutMs = 1000; protected final MockTime time = new MockTime(); protected final MockSelector selector = new MockSelector(time); protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); @@ -69,19 +70,19 @@ private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) { return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, - requestTimeoutMs, time, true, new ApiVersions(), new LogContext()); + minRequestTimeoutMs, time, true, new ApiVersions(), new LogContext()); } private NetworkClient createNetworkClientWithStaticNodes() { return new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)), - "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, + "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, minRequestTimeoutMs, time, true, new ApiVersions(), new LogContext()); } private NetworkClient createNetworkClientWithNoVersionDiscovery() { return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, - 64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions(), new LogContext()); + 64 * 1024, 64 * 1024, minRequestTimeoutMs, time, false, new ApiVersions(), new LogContext()); } @Before @@ -140,10 +141,10 @@ public void testClose() { private void checkSimpleRequestResponse(NetworkClient networkClient) { awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.<TopicPartition, MemoryRecords>emptyMap()); + Collections.emptyMap()); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = networkClient.newClientRequest( - node.idString(), builder, time.milliseconds(), true, handler); + node.idString(), builder, time.milliseconds(), true, minRequestTimeoutMs, handler); networkClient.send(request, time.milliseconds()); networkClient.poll(1, time.milliseconds()); assertEquals(1, networkClient.inFlightRequestCount()); @@ -184,16 +185,28 @@ private void awaitReady(NetworkClient client, Node node) { public void testRequestTimeout() { awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, - 1000, Collections.<TopicPartition, MemoryRecords>emptyMap()); + 1000, Collections.emptyMap()); TestCallbackHandler handler = new TestCallbackHandler(); - long now = time.milliseconds(); - ClientRequest request = client.newClientRequest( - node.idString(), builder, now, true, handler); - client.send(request, now); + int requestTimeoutMs = minRequestTimeoutMs + 5000; + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, + requestTimeoutMs, handler); + testRequestTimeout(request); + } + + @Test + public void testMinRequestTimeout() { + awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 + ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, + 1000, Collections.emptyMap()); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); + testRequestTimeout(request); + } - // sleeping to make sure that the time since last send is greater than requestTimeOut - time.sleep(3000); - List<ClientResponse> responses = client.poll(3000, time.milliseconds()); + private void testRequestTimeout(ClientRequest request) { + client.send(request, time.milliseconds()); + + time.sleep(request.requestTimeoutMs() + 1); + List<ClientResponse> responses = client.poll(0, time.milliseconds()); assertEquals(1, responses.size()); ClientResponse clientResponse = responses.get(0); @@ -206,9 +219,10 @@ public void testConnectionThrottling() { // Instrument the test to return a response with a 100ms throttle delay. awaitReady(client, node); ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.<TopicPartition, MemoryRecords>emptyMap()); + Collections.emptyMap()); TestCallbackHandler handler = new TestCallbackHandler(); - ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, + minRequestTimeoutMs, handler); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); ResponseHeader respHeader = new ResponseHeader(request.correlationId()); @@ -222,7 +236,7 @@ public void testConnectionThrottling() { resp.writeTo(buffer); buffer.flip(); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); - List<ClientResponse> responses = client.poll(1, time.milliseconds()); + client.poll(1, time.milliseconds()); // The connection is not ready due to throttling. assertFalse(client.ready(node, time.milliseconds())); @@ -264,9 +278,10 @@ public void testThrottlingNotEnabledForConnectionToOlderBroker() { selector.clear(); ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.<TopicPartition, MemoryRecords>emptyMap()); + Collections.emptyMap()); TestCallbackHandler handler = new TestCallbackHandler(); - ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, handler); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, + minRequestTimeoutMs, handler); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); ResponseHeader respHeader = new ResponseHeader(request.correlationId()); @@ -280,7 +295,7 @@ public void testThrottlingNotEnabledForConnectionToOlderBroker() { resp.writeTo(buffer); buffer.flip(); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); - List<ClientResponse> responses = client.poll(1, time.milliseconds()); + client.poll(1, time.milliseconds()); // Since client-side throttling is disabled, the connection is ready even though the response indicated a // throttle delay. @@ -308,7 +323,7 @@ public void testLeastLoadedNode() { client.poll(1, time.milliseconds()); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); leastNode = client.leastLoadedNode(time.milliseconds()); - assertEquals("There should be NO leastloadednode", leastNode, null); + assertNull("There should be NO leastloadednode", leastNode); } @Test @@ -334,7 +349,7 @@ public void testConnectionDelayDisconnectedWithNoExponentialBackoff() { awaitReady(clientWithNoExponentialBackoff, node); selector.serverDisconnect(node.idString()); - clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds()); + clientWithNoExponentialBackoff.poll(minRequestTimeoutMs, time.milliseconds()); long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds()); assertEquals(reconnectBackoffMsTest, delay); @@ -346,7 +361,7 @@ public void testConnectionDelayDisconnectedWithNoExponentialBackoff() { // Start connecting and disconnect before the connection is established client.ready(node, time.milliseconds()); selector.serverDisconnect(node.idString()); - client.poll(requestTimeoutMs, time.milliseconds()); + client.poll(minRequestTimeoutMs, time.milliseconds()); // Second attempt should have the same behaviour as exponential backoff is disabled assertEquals(reconnectBackoffMsTest, delay); @@ -376,7 +391,7 @@ public void testConnectionDelayDisconnected() { // First disconnection selector.serverDisconnect(node.idString()); - client.poll(requestTimeoutMs, time.milliseconds()); + client.poll(minRequestTimeoutMs, time.milliseconds()); long delay = client.connectionDelay(node, time.milliseconds()); long expectedDelay = reconnectBackoffMsTest; double jitter = 0.3; @@ -389,7 +404,7 @@ public void testConnectionDelayDisconnected() { // Start connecting and disconnect before the connection is established client.ready(node, time.milliseconds()); selector.serverDisconnect(node.idString()); - client.poll(requestTimeoutMs, time.milliseconds()); + client.poll(minRequestTimeoutMs, time.milliseconds()); // Second attempt should take twice as long with twice the jitter expectedDelay = Math.round(delay * 2); @@ -408,13 +423,13 @@ public void testDisconnectDuringUserMetadataRequest() { long now = time.milliseconds(); ClientRequest request = client.newClientRequest(node.idString(), builder, now, true); client.send(request, now); - client.poll(requestTimeoutMs, now); + client.poll(minRequestTimeoutMs, now); assertEquals(1, client.inFlightRequestCount(node.idString())); assertTrue(client.hasInFlightRequests(node.idString())); assertTrue(client.hasInFlightRequests()); selector.close(node.idString()); - List<ClientResponse> responses = client.poll(requestTimeoutMs, time.milliseconds()); + List<ClientResponse> responses = client.poll(minRequestTimeoutMs, time.milliseconds()); assertEquals(1, responses.size()); assertTrue(responses.iterator().next().wasDisconnected()); } @@ -442,7 +457,7 @@ public void testClientDisconnectAfterInternalApiVersionRequest() throws Exceptio } @Test - public void testDisconnectWithMultipleInFlights() throws Exception { + public void testDisconnectWithMultipleInFlights() { NetworkClient client = this.clientWithNoVersionDiscovery; awaitReady(client, node); assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(), @@ -459,11 +474,11 @@ public void onComplete(ClientResponse response) { } }; - ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, callback); + ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, minRequestTimeoutMs, callback); client.send(request1, now); client.poll(0, now); - ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, callback); + ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, minRequestTimeoutMs, callback); client.send(request2, now); client.poll(0, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b8681e8f1bc..97ec08209aa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1747,7 +1747,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, String groupId = "mock-group"; String metricGroupPrefix = "consumer"; long retryBackoffMs = 100; - long requestTimeoutMs = 30000; + int requestTimeoutMs = 30000; int defaultApiTimeoutMs = 30000; boolean excludeInternalTopics = true; int minBytes = 1; @@ -1762,7 +1762,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, Deserializer<String> valueDeserializer = new StringDeserializer(); List<PartitionAssignor> assignors = singletonList(assignor); - ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String, String>>emptyList()); + ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.emptyList()); Metrics metrics = new Metrics(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 32aae442388..f88e72505a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -45,11 +45,16 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -59,9 +64,9 @@ private static final int REBALANCE_TIMEOUT_MS = 60000; private static final int SESSION_TIMEOUT_MS = 10000; private static final int HEARTBEAT_INTERVAL_MS = 3000; - private static final long RETRY_BACKOFF_MS = 20; - private static final long LONG_RETRY_BACKOFF_MS = 10000; - private static final long REQUEST_TIMEOUT_MS = 40000; + private static final int RETRY_BACKOFF_MS = 100; + private static final int LONG_RETRY_BACKOFF_MS = 10000; + private static final int REQUEST_TIMEOUT_MS = 40000; private static final String GROUP_ID = "dummy-group"; private static final String METRIC_GROUP_PREFIX = "consumer"; @@ -72,27 +77,35 @@ private ConsumerNetworkClient consumerClient; private DummyCoordinator coordinator; - private void setupCoordinator(long retryBackoffMs) { + private void setupCoordinator() { + setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS); + } + + private void setupCoordinator(int retryBackoffMs) { + setupCoordinator(retryBackoffMs, REBALANCE_TIMEOUT_MS); + } + + private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) { this.mockTime = new MockTime(); this.mockClient = new MockClient(mockTime); - Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true); + Metadata metadata = new Metadata(retryBackoffMs, 60 * 60 * 1000L, true); this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime, retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS); Metrics metrics = new Metrics(); Cluster cluster = TestUtils.singletonCluster("topic", 1); - metadata.update(cluster, Collections.<String>emptySet(), mockTime.milliseconds()); + metadata.update(cluster, Collections.emptySet(), mockTime.milliseconds()); this.node = cluster.nodes().get(0); mockClient.setNode(node); this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); - this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime); + this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs); } @Test public void testCoordinatorDiscoveryBackoff() { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -108,9 +121,69 @@ public void testCoordinatorDiscoveryBackoff() { assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); } + @Test + public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { + setupCoordinator(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(0); + + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + long firstAttemptStartMs = mockTime.milliseconds(); + Future<Boolean> firstAttempt = executor.submit(() -> + coordinator.joinGroupIfNeeded(REQUEST_TIMEOUT_MS, firstAttemptStartMs)); + + mockTime.sleep(REQUEST_TIMEOUT_MS); + assertFalse(firstAttempt.get()); + assertTrue(consumerClient.hasPendingRequests(coordinatorNode)); + + mockClient.respond(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + long secondAttemptMs = mockTime.milliseconds(); + Future<Boolean> secondAttempt = executor.submit(() -> + coordinator.joinGroupIfNeeded(REQUEST_TIMEOUT_MS, secondAttemptMs)); + + assertTrue(secondAttempt.get()); + } finally { + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + } + + @Test + public void testJoinGroupRequestTimeout() { + setupCoordinator(RETRY_BACKOFF_MS, REBALANCE_TIMEOUT_MS); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(0); + + RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest(); + + mockTime.sleep(REQUEST_TIMEOUT_MS + 1); + assertFalse(consumerClient.poll(future, 0)); + + mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + 5000); + assertTrue(consumerClient.poll(future, 0)); + } + + @Test + public void testJoinGroupRequestMaxTimeout() { + // Ensure we can handle the maximum allowed rebalance timeout + + setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(0); + + RequestFuture<ByteBuffer> future = coordinator.sendJoinGroupRequest(); + assertFalse(consumerClient.poll(future, 0)); + + mockTime.sleep(Integer.MAX_VALUE + 1L); + assertTrue(consumerClient.poll(future, 0)); + } + @Test public void testUncaughtExceptionInHeartbeatThread() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); @@ -170,8 +243,8 @@ public boolean matches(AbstractRequest body) { } @Test - public void testLookupCoordinator() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + public void testLookupCoordinator() { + setupCoordinator(); mockClient.setNode(null); RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator(); @@ -180,16 +253,16 @@ public void testLookupCoordinator() throws Exception { mockClient.setNode(node); RequestFuture<Void> future = coordinator.lookupCoordinator(); assertFalse("Request not sent", future.isDone()); - assertTrue("New request sent while one is in progress", future == coordinator.lookupCoordinator()); + assertSame("New request sent while one is in progress", future, coordinator.lookupCoordinator()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(Long.MAX_VALUE); - assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator()); + assertNotSame("New request not sent after previous completed", future, coordinator.lookupCoordinator()); } @Test public void testWakeupAfterJoinGroupSent() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -227,7 +300,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterJoinGroupSentExternalCompletion() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -267,7 +340,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterJoinGroupReceived() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -303,7 +376,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(new MockClient.RequestMatcher() { @@ -341,7 +414,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterSyncGroupSent() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); @@ -379,7 +452,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); @@ -419,7 +492,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterSyncGroupReceived() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); @@ -455,7 +528,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); @@ -491,7 +564,7 @@ public boolean matches(AbstractRequest body) { @Test public void testWakeupInOnJoinComplete() throws Exception { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); coordinator.wakeupOnJoinComplete = true; mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -522,7 +595,7 @@ public void testWakeupInOnJoinComplete() throws Exception { @Test public void testAuthenticationErrorInEnsureCoordinatorReady() { - setupCoordinator(RETRY_BACKOFF_MS); + setupCoordinator(); mockClient.createPendingAuthenticationError(node, 300); @@ -583,9 +656,11 @@ private SyncGroupResponse syncGroupResponse(Errors error) { public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, - Time time) { - super(new LogContext(), client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, - HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false); + Time time, + int rebalanceTimeoutMs, + int retryBackoffMs) { + super(new LogContext(), client, GROUP_ID, rebalanceTimeoutMs, SESSION_TIMEOUT_MS, + HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, false); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 32da34a1661..bd0038d2be2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -100,6 +100,7 @@ private int heartbeatIntervalMs = 5000; private long retryBackoffMs = 100; private int autoCommitIntervalMs = 2000; + private int requestTimeoutMs = 30000; private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor(); private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor); private MockTime time; @@ -126,7 +127,8 @@ public void setup() { this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); - this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 1000, Integer.MAX_VALUE); + this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, + requestTimeoutMs, Integer.MAX_VALUE); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); @@ -566,7 +568,7 @@ public boolean matches(AbstractRequest body) { } }, syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -603,9 +605,9 @@ public boolean matches(AbstractRequest body) { } }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE)); // expect client to force updating the metadata, if yes gives it both topics - client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); + client.prepareMetadataUpdate(cluster, Collections.emptySet()); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(2, subscriptions.assignedPartitions().size()); @@ -671,8 +673,8 @@ public void testUnexpectedErrorOnSyncGroup() { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); - client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); } @Test @@ -698,7 +700,7 @@ public boolean matches(AbstractRequest body) { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -721,7 +723,7 @@ public void testRebalanceInProgressOnSyncGroup() { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -750,7 +752,7 @@ public boolean matches(AbstractRequest body) { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -937,7 +939,7 @@ public void testRejoinGroup() { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertEquals(2, rebalanceListener.revokedCount); assertEquals(singleton(t1p), rebalanceListener.revoked); @@ -957,7 +959,7 @@ public void testDisconnectInJoin() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -975,7 +977,7 @@ public void testInvalidSessionTimeout() { // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); } @Test @@ -1132,7 +1134,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); subscriptions.seek(t1p, 100); @@ -1630,14 +1632,14 @@ public void testCloseCoordinatorNotKnownManualAssignment() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + closeVerifyTimeout(coordinator, 1000, 1000, 1000); } @Test public void testCloseCoordinatorNotKnownNoCommits() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR); - closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + closeVerifyTimeout(coordinator, 1000, 0, 0); } @Test @@ -1645,14 +1647,14 @@ public void testCloseCoordinatorNotKnownWithCommits() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + closeVerifyTimeout(coordinator, 1000, 1000, 1000); } @Test public void testCloseCoordinatorUnavailableNoCommits() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE); - closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + closeVerifyTimeout(coordinator, 1000, 0, 0); } @Test @@ -1660,7 +1662,7 @@ public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); + closeVerifyTimeout(coordinator, 1000, 1000, 1000); } @Test @@ -1668,27 +1670,27 @@ public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs); } @Test public void testCloseNoResponseForCommit() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs); } @Test public void testCloseNoResponseForLeaveGroup() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); - closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs); } @Test public void testCloseNoWait() throws Exception { ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); - closeVerifyTimeout(coordinator, 0, 60000, 0, 0); + closeVerifyTimeout(coordinator, 0, 0, 0); } @Test @@ -1698,7 +1700,7 @@ public void testHeartbeatThreadClose() throws Exception { coordinator.ensureActiveGroup(); time.sleep(heartbeatIntervalMs + 100); Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat - closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); + closeVerifyTimeout(coordinator, Long.MAX_VALUE, requestTimeoutMs, requestTimeoutMs); Thread[] threads = new Thread[Thread.activeCount()]; int threadCount = Thread.enumerate(threads); for (int i = 0; i < threadCount; i++) @@ -1736,7 +1738,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); } else subscriptions.assignFromUser(singleton(t1p)); @@ -1754,9 +1756,11 @@ private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors erro consumerClient.poll(0); assertTrue(coordinator.coordinatorUnknown()); } + private void closeVerifyTimeout(final ConsumerCoordinator coordinator, - final long closeTimeoutMs, final long requestTimeoutMs, - long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception { + final long closeTimeoutMs, + final long expectedMinTimeMs, + final long expectedMaxTimeMs) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); try { boolean coordinatorUnknown = coordinator.coordinatorUnknown(); @@ -1903,7 +1907,7 @@ private void joinAsFollowerAndReceiveAssignment(String consumerId, coordinator.ensureCoordinatorReady(Long.MAX_VALUE); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(Long.MAX_VALUE); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE, time.milliseconds()); } private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index d0888fa5655..d5ec3827244 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.HeartbeatRequest; @@ -125,6 +126,26 @@ public void testDisconnectWithInFlightRequests() { assertTrue(future.exception() instanceof DisconnectException); } + @Test + public void testTimeoutUnsentRequest() { + // Delay connection to the node so that the request remains unsent + client.delayReady(node, 1000); + + RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat(), 500); + consumerClient.pollNoWakeup(); + + // Ensure the request is pending, but hasn't been sent + assertTrue(consumerClient.hasPendingRequests()); + assertFalse(client.hasInFlightRequests()); + + time.sleep(501); + consumerClient.pollNoWakeup(); + + assertFalse(consumerClient.hasPendingRequests()); + assertTrue(future.failed()); + assertTrue(future.exception() instanceof TimeoutException); + } + @Test public void doNotBlockIfPollConditionIsSatisfied() { NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); @@ -175,7 +196,7 @@ public void blockOnlyForRetryBackoffIfNoInflightRequests() { NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class); ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), - mockNetworkClient, metadata, time, retryBackoffMs, 1000L, Integer.MAX_VALUE); + mockNetworkClient, metadata, time, retryBackoffMs, 1000, Integer.MAX_VALUE); EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0); EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList()); @@ -273,8 +294,8 @@ public void testAwaitForMetadataUpdateWithTimeout() { } @Test - public void sendExpiry() throws InterruptedException { - long unsentExpiryMs = 10; + public void sendExpiry() { + int requestTimeoutMs = 10; final AtomicBoolean isReady = new AtomicBoolean(); final AtomicBoolean disconnected = new AtomicBoolean(); client = new MockClient(time) { @@ -291,13 +312,13 @@ public boolean connectionFailed(Node node) { } }; // Queue first send, sleep long enough for this to expire and then queue second send - consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, unsentExpiryMs, Integer.MAX_VALUE); + consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, requestTimeoutMs, Integer.MAX_VALUE); RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat()); assertEquals(1, consumerClient.pendingRequestCount()); assertEquals(1, consumerClient.pendingRequestCount(node)); assertFalse(future1.isDone()); - time.sleep(unsentExpiryMs + 1); + time.sleep(requestTimeoutMs + 1); RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat()); assertEquals(2, consumerClient.pendingRequestCount()); assertEquals(2, consumerClient.pendingRequestCount(node)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 46666ca536e..4169550ef11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1392,7 +1392,7 @@ public void testGetTopicMetadataLeaderNotAvailable() { * Send multiple requests. Verify that the client side quota metrics have the right values */ @Test - public void testQuotaMetrics() throws Exception { + public void testQuotaMetrics() { MockSelector selector = new MockSelector(time); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry); Cluster cluster = TestUtils.singletonCluster("test", 1); @@ -1413,8 +1413,8 @@ public void testQuotaMetrics() throws Exception { for (int i = 1; i <= 3; i++) { int throttleTimeMs = 100 * i; - FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<TopicPartition, PartitionData>()); - ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); + FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<>()); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 77005b70152..d87c8f9e894 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -263,8 +263,8 @@ public void testQuotaMetrics() throws Exception { for (int i = 1; i <= 3; i++) { int throttleTimeMs = 100 * i; ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, - Collections.<TopicPartition, MemoryRecords>emptyMap()); - ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); + Collections.emptyMap()); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 7312f10a04a..d847881bff4 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -460,7 +460,7 @@ object AdminClient { metadata, time, retryBackoffMs, - requestTimeoutMs.toLong, + requestTimeoutMs, Integer.MAX_VALUE) new AdminClient( diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index 60635f17073..aedaac79d27 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -39,7 +39,7 @@ abstract class InterBrokerSendThread(name: String, extends ShutdownableThread(name, isInterruptible) { def generateRequests(): Iterable[RequestAndCompletionHandler] - def unsentExpiryMs: Int + def requestTimeoutMs: Int private val unsentRequests = new UnsentRequests def hasUnsentRequests = unsentRequests.iterator().hasNext @@ -57,7 +57,8 @@ abstract class InterBrokerSendThread(name: String, generateRequests().foreach { request => val completionHandler = request.handler unsentRequests.put(request.destination, - networkClient.newClientRequest(request.destination.idString, request.request, now, true, completionHandler)) + networkClient.newClientRequest(request.destination.idString, request.request, now, true, + requestTimeoutMs, completionHandler)) } try { @@ -118,9 +119,9 @@ abstract class InterBrokerSendThread(name: String, private def failExpiredRequests(now: Long): Unit = { // clear all expired unsent requests - val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs) - for (request <- expiredRequests.asScala) { - debug(s"Failed to send the following request after $unsentExpiryMs ms: $request") + val timedOutRequests = unsentRequests.removeAllTimedOut(now) + for (request <- timedOutRequests.asScala) { + debug(s"Failed to send the following request after ${request.requestTimeoutMs} ms: $request") completeWithDisconnect(request, now, null) } } @@ -152,14 +153,15 @@ private class UnsentRequests { requests.add(request) } - def removeExpiredRequests(now: Long, unsentExpiryMs: Long): Collection[ClientRequest] = { + def removeAllTimedOut(now: Long): Collection[ClientRequest] = { val expiredRequests = new ArrayList[ClientRequest] for (requests <- unsent.values.asScala) { val requestIterator = requests.iterator var foundExpiredRequest = false while (requestIterator.hasNext && !foundExpiredRequest) { val request = requestIterator.next - if (request.createdTimeMs < now - unsentExpiryMs) { + val elapsedMs = Math.max(0, now - request.createdTimeMs) + if (elapsedMs > request.requestTimeoutMs) { expiredRequests.add(request) requestIterator.remove() foundExpiredRequest = true diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 7059ced5b3c..f8b56e8e008 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -135,7 +135,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]() - override val unsentExpiryMs: Int = config.requestTimeoutMs + override val requestTimeoutMs: Int = config.requestTimeoutMs newGauge( "UnknownDestinationQueueSize", diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala index 710686693e3..6838653ad13 100644 --- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala +++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala @@ -32,11 +32,12 @@ class InterBrokerSendThreadTest { private val time = new MockTime() private val networkClient = EasyMock.createMock(classOf[NetworkClient]) private val completionHandler = new StubCompletionHandler + private val requestTimeoutMs = 1000 @Test def shouldNotSendAnythingWhenNoRequests(): Unit = { val sendThread = new InterBrokerSendThread("name", networkClient, time) { - override val unsentExpiryMs: Int = 1000 + override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs override def generateRequests() = mutable.Iterable.empty } @@ -58,16 +59,18 @@ class InterBrokerSendThreadTest { val node = new Node(1, "", 8080) val handler = RequestAndCompletionHandler(node, request, completionHandler) val sendThread = new InterBrokerSendThread("name", networkClient, time) { - override val unsentExpiryMs: Int = 1000 + override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs override def generateRequests() = List[RequestAndCompletionHandler](handler) } - val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, handler.handler) + val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, + requestTimeoutMs, handler.handler) EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"), EasyMock.same(handler.request), EasyMock.anyLong(), EasyMock.eq(true), + EasyMock.eq(requestTimeoutMs), EasyMock.same(handler.handler))) .andReturn(clientRequest) @@ -93,16 +96,18 @@ class InterBrokerSendThreadTest { val node = new Node(1, "", 8080) val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler) val sendThread = new InterBrokerSendThread("name", networkClient, time) { - override val unsentExpiryMs: Int = 1000 + override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler) } - val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestAndCompletionHandler.handler) + val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, + requestTimeoutMs, requestAndCompletionHandler.handler) EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"), EasyMock.same(requestAndCompletionHandler.request), EasyMock.anyLong(), EasyMock.eq(true), + EasyMock.eq(requestTimeoutMs), EasyMock.same(requestAndCompletionHandler.handler))) .andReturn(clientRequest) @@ -135,17 +140,19 @@ class InterBrokerSendThreadTest { val node = new Node(1, "", 8080) val handler = RequestAndCompletionHandler(node, request, completionHandler) val sendThread = new InterBrokerSendThread("name", networkClient, time) { - override val unsentExpiryMs: Int = 1000 + override val requestTimeoutMs: Int = InterBrokerSendThreadTest.this.requestTimeoutMs override def generateRequests() = List[RequestAndCompletionHandler](handler) } - val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true, handler.handler) + val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true, + requestTimeoutMs, handler.handler) time.sleep(1500) EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"), EasyMock.same(handler.request), EasyMock.eq(time.milliseconds()), EasyMock.eq(true), + EasyMock.eq(requestTimeoutMs), EasyMock.same(handler.handler))) .andReturn(clientRequest) diff --git a/docs/upgrade.html b/docs/upgrade.html index 7061d6c42d9..6119536198a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -105,6 +105,11 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>, <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li> + <li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds. + The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. + Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from + <code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined + by <code>request.timeout.ms</code></li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The tool kafka.tools.ReplayLogProducer has been removed.</li> <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Decrease consumer request timeout to 30s > ---------------------------------------- > > Key: KAFKA-7050 > URL: https://issues.apache.org/jira/browse/KAFKA-7050 > Project: Kafka > Issue Type: Improvement > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > Fix For: 2.0.0 > > > Per KIP-266 discussion, we should lower the request timeout. We should also > add new logic to override this timeout for the JoinGroup request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)