http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index b65a5b7..07edd3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; @@ -36,27 +37,34 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; /** - * Higher level consumer access to the network layer with basic support for futures and - * task scheduling. This class is not thread-safe, except for wakeup(). + * Higher level consumer access to the network layer with basic support for request futures. This class + * is thread-safe, but provides no synchronization for response callbacks. This guarantees that no locks + * are held when they are invoked. */ public class ConsumerNetworkClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); + // the mutable state of this class is protected by the object's monitor (excluding the wakeup + // flag and the request completion queue below). private final KafkaClient client; - private final AtomicBoolean wakeup = new AtomicBoolean(false); - private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); private final Map<Node, List<ClientRequest>> unsent = new HashMap<>(); private final Metadata metadata; private final Time time; private final long retryBackoffMs; private final long unsentExpiryMs; - - // this count is only accessed from the consumer's main thread private int wakeupDisabledCount = 0; + // when requests complete, they are transferred to this queue prior to invocation. The purpose + // is to avoid invoking them while holding the lock above. + private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>(); + + // this flag allows the client to be safely woken up without waiting on the lock above. It is + // atomic to avoid the need to acquire the lock above in order to enable it concurrently. + private final AtomicBoolean wakeup = new AtomicBoolean(false); public ConsumerNetworkClient(KafkaClient client, Metadata metadata, @@ -71,25 +79,6 @@ public class ConsumerNetworkClient implements Closeable { } /** - * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and - * should only be used for coarse synchronization. - * @param task The task to be scheduled - * @param at The time it should run - */ - public void schedule(DelayedTask task, long at) { - delayedTasks.add(task, at); - } - - /** - * Unschedule a task. This will remove all instances of the task from the task queue. - * This is a no-op if the task is not scheduled. - * @param task The task to be unscheduled. - */ - public void unschedule(DelayedTask task) { - delayedTasks.remove(task); - } - - /** * Send a new request. Note that the request is not actually transmitted on the * network until one of the {@link #poll(long)} variants is invoked. At this * point the request will either be transmitted successfully or will fail. @@ -104,25 +93,36 @@ public class ConsumerNetworkClient implements Closeable { public RequestFuture<ClientResponse> send(Node node, ApiKeys api, AbstractRequest request) { + return send(node, api, ProtoUtils.latestVersion(api.id), request); + } + + private RequestFuture<ClientResponse> send(Node node, + ApiKeys api, + short version, + AbstractRequest request) { long now = time.milliseconds(); - RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); - RequestHeader header = client.nextRequestHeader(api); + RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); + RequestHeader header = client.nextRequestHeader(api, version); RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); - put(node, new ClientRequest(now, true, send, future)); - return future; + put(node, new ClientRequest(now, true, send, completionHandler)); + return completionHandler.future; } private void put(Node node, ClientRequest request) { - List<ClientRequest> nodeUnsent = unsent.get(node); - if (nodeUnsent == null) { - nodeUnsent = new ArrayList<>(); - unsent.put(node, nodeUnsent); + synchronized (this) { + List<ClientRequest> nodeUnsent = unsent.get(node); + if (nodeUnsent == null) { + nodeUnsent = new ArrayList<>(); + unsent.put(node, nodeUnsent); + } + nodeUnsent.add(request); } - nodeUnsent.add(request); } public Node leastLoadedNode() { - return client.leastLoadedNode(time.milliseconds()); + synchronized (this) { + return client.leastLoadedNode(time.milliseconds()); + } } /** @@ -149,6 +149,8 @@ public class ConsumerNetworkClient implements Closeable { * on the current poll if one is active, or the next poll. */ public void wakeup() { + // wakeup should be safe without holding the client lock since it simply delegates to + // Selector's wakeup, which is threadsafe this.wakeup.set(true); this.client.wakeup(); } @@ -175,7 +177,7 @@ public class ConsumerNetworkClient implements Closeable { long remaining = timeout; long now = begin; do { - poll(remaining, now, true); + poll(remaining, now); now = time.milliseconds(); long elapsed = now - begin; remaining = timeout - elapsed; @@ -189,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable { * @throws WakeupException if {@link #wakeup()} is called from another thread */ public void poll(long timeout) { - poll(timeout, time.milliseconds(), true); + poll(timeout, time.milliseconds()); } /** @@ -198,7 +200,37 @@ public class ConsumerNetworkClient implements Closeable { * @param now current time in milliseconds */ public void poll(long timeout, long now) { - poll(timeout, now, true); + // there may be handlers which need to be invoked if we woke up the previous call to poll + firePendingCompletedRequests(); + + synchronized (this) { + // send all the requests we can send now + trySend(now); + + // ensure we don't poll any longer than the deadline for + // the next scheduled task + client.poll(timeout, now); + now = time.milliseconds(); + + // handle any disconnects by failing the active requests. note that disconnects must + // be checked immediately following poll since any subsequent call to client.ready() + // will reset the disconnect status + checkDisconnects(now); + + // trigger wakeups after checking for disconnects so that the callbacks will be ready + // to be fired on the next call to poll() + maybeTriggerWakeup(); + + // try again to send requests since buffer space may have been + // cleared or a connect finished in the poll + trySend(now); + + // fail requests that couldn't be sent if they have expired + failExpiredRequests(now); + } + + // called without the lock to avoid deadlock potential if handlers need to acquire locks + firePendingCompletedRequests(); } /** @@ -208,49 +240,12 @@ public class ConsumerNetworkClient implements Closeable { public void pollNoWakeup() { disableWakeups(); try { - poll(0, time.milliseconds(), false); + poll(0, time.milliseconds()); } finally { enableWakeups(); } } - private void poll(long timeout, long now, boolean executeDelayedTasks) { - // send all the requests we can send now - trySend(now); - - // ensure we don't poll any longer than the deadline for - // the next scheduled task - timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); - clientPoll(timeout, now); - now = time.milliseconds(); - - // handle any disconnects by failing the active requests. note that disconnects must - // be checked immediately following poll since any subsequent call to client.ready() - // will reset the disconnect status - checkDisconnects(now); - - // execute scheduled tasks - if (executeDelayedTasks) - delayedTasks.poll(now); - - // try again to send requests since buffer space may have been - // cleared or a connect finished in the poll - trySend(now); - - // fail requests that couldn't be sent if they have expired - failExpiredRequests(now); - } - - /** - * Execute delayed tasks now. - * @param now current time in milliseconds - * @throws WakeupException if a wakeup has been requested - */ - public void executeDelayedTasks(long now) { - delayedTasks.poll(now); - maybeTriggerWakeup(); - } - /** * Block until all pending requests from the given node have finished. * @param node The node to await requests from @@ -267,9 +262,11 @@ public class ConsumerNetworkClient implements Closeable { * @return The number of pending requests */ public int pendingRequestCount(Node node) { - List<ClientRequest> pending = unsent.get(node); - int unsentCount = pending == null ? 0 : pending.size(); - return unsentCount + client.inFlightRequestCount(node.idString()); + synchronized (this) { + List<ClientRequest> pending = unsent.get(node); + int unsentCount = pending == null ? 0 : pending.size(); + return unsentCount + client.inFlightRequestCount(node.idString()); + } } /** @@ -278,10 +275,22 @@ public class ConsumerNetworkClient implements Closeable { * @return The total count of pending requests */ public int pendingRequestCount() { - int total = 0; - for (List<ClientRequest> requests: unsent.values()) - total += requests.size(); - return total + client.inFlightRequestCount(); + synchronized (this) { + int total = 0; + for (List<ClientRequest> requests: unsent.values()) + total += requests.size(); + return total + client.inFlightRequestCount(); + } + } + + private void firePendingCompletedRequests() { + for (;;) { + RequestFutureCompletionHandler completionHandler = pendingCompletion.poll(); + if (completionHandler == null) + break; + + completionHandler.fireCompletion(); + } } private void checkDisconnects(long now) { @@ -315,9 +324,8 @@ public class ConsumerNetworkClient implements Closeable { while (requestIterator.hasNext()) { ClientRequest request = requestIterator.next(); if (request.createdTimeMs() < now - unsentExpiryMs) { - RequestFutureCompletionHandler handler = - (RequestFutureCompletionHandler) request.callback(); - handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); requestIterator.remove(); } else break; @@ -327,15 +335,20 @@ public class ConsumerNetworkClient implements Closeable { } } - protected void failUnsentRequests(Node node, RuntimeException e) { + public void failUnsentRequests(Node node, RuntimeException e) { // clear unsent requests to node and fail their corresponding futures - List<ClientRequest> unsentRequests = unsent.remove(node); - if (unsentRequests != null) { - for (ClientRequest request : unsentRequests) { - RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); - handler.raise(e); + synchronized (this) { + List<ClientRequest> unsentRequests = unsent.remove(node); + if (unsentRequests != null) { + for (ClientRequest request : unsentRequests) { + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + handler.onFailure(e); + } } } + + // called without the lock to avoid deadlock potential + firePendingCompletedRequests(); } private boolean trySend(long now) { @@ -356,11 +369,6 @@ public class ConsumerNetworkClient implements Closeable { return requestsSent; } - private void clientPoll(long timeout, long now) { - client.poll(timeout, now); - maybeTriggerWakeup(); - } - private void maybeTriggerWakeup() { if (wakeupDisabledCount == 0 && wakeup.get()) { wakeup.set(false); @@ -369,24 +377,30 @@ public class ConsumerNetworkClient implements Closeable { } public void disableWakeups() { - wakeupDisabledCount++; + synchronized (this) { + wakeupDisabledCount++; + } } public void enableWakeups() { - if (wakeupDisabledCount <= 0) - throw new IllegalStateException("Cannot enable wakeups since they were never disabled"); + synchronized (this) { + if (wakeupDisabledCount <= 0) + throw new IllegalStateException("Cannot enable wakeups since they were never disabled"); - wakeupDisabledCount--; + wakeupDisabledCount--; - // re-wakeup the client if the flag was set since previous wake-up call - // could be cleared by poll(0) while wakeups were disabled - if (wakeupDisabledCount == 0 && wakeup.get()) - this.client.wakeup(); + // re-wakeup the client if the flag was set since previous wake-up call + // could be cleared by poll(0) while wakeups were disabled + if (wakeupDisabledCount == 0 && wakeup.get()) + this.client.wakeup(); + } } @Override public void close() throws IOException { - client.close(); + synchronized (this) { + client.close(); + } } /** @@ -395,7 +409,9 @@ public class ConsumerNetworkClient implements Closeable { * @param node Node to connect to if possible */ public boolean connectionFailed(Node node) { - return client.connectionFailed(node); + synchronized (this) { + return client.connectionFailed(node); + } } /** @@ -405,26 +421,45 @@ public class ConsumerNetworkClient implements Closeable { * @param node The node to connect to */ public void tryConnect(Node node) { - client.ready(node, time.milliseconds()); + synchronized (this) { + client.ready(node, time.milliseconds()); + } } - public static class RequestFutureCompletionHandler - extends RequestFuture<ClientResponse> - implements RequestCompletionHandler { + public class RequestFutureCompletionHandler implements RequestCompletionHandler { + private final RequestFuture<ClientResponse> future; + private ClientResponse response; + private RuntimeException e; - @Override - public void onComplete(ClientResponse response) { - if (response.wasDisconnected()) { + public RequestFutureCompletionHandler() { + this.future = new RequestFuture<>(); + } + + public void fireCompletion() { + if (e != null) { + future.raise(e); + } else if (response.wasDisconnected()) { ClientRequest request = response.request(); RequestSend send = request.request(); ApiKeys api = ApiKeys.forId(send.header().apiKey()); int correlation = send.header().correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, request, correlation, send.destination()); - raise(DisconnectException.INSTANCE); + future.raise(DisconnectException.INSTANCE); } else { - complete(response); + future.complete(response); } } + + public void onFailure(RuntimeException e) { + this.e = e; + pendingCompletion.add(this); + } + + @Override + public void onComplete(ClientResponse response) { + this.response = response; + pendingCompletion.add(this); + } } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java deleted file mode 100644 index 61663f8..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals; - - -public interface DelayedTask { - - /** - * Execute the task. - * @param now current time in milliseconds - */ - void run(long now); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java deleted file mode 100644 index 61cab20..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals; - -import java.util.Iterator; -import java.util.PriorityQueue; - -/** - * Tracks a set of tasks to be executed after a delay. - */ -public class DelayedTaskQueue { - - private PriorityQueue<Entry> tasks; - - public DelayedTaskQueue() { - tasks = new PriorityQueue<Entry>(); - } - - /** - * Schedule a task for execution in the future. - * - * @param task the task to execute - * @param at the time at which to - */ - public void add(DelayedTask task, long at) { - tasks.add(new Entry(task, at)); - } - - /** - * Remove a task from the queue if it is present - * @param task the task to be removed - * @returns true if a task was removed as a result of this call - */ - public boolean remove(DelayedTask task) { - boolean wasRemoved = false; - Iterator<Entry> iterator = tasks.iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - if (entry.task.equals(task)) { - iterator.remove(); - wasRemoved = true; - } - } - return wasRemoved; - } - - /** - * Get amount of time in milliseconds until the next event. Returns Long.MAX_VALUE if no tasks are scheduled. - * - * @return the remaining time in milliseconds - */ - public long nextTimeout(long now) { - if (tasks.isEmpty()) - return Long.MAX_VALUE; - else - return Math.max(tasks.peek().timeout - now, 0); - } - - /** - * Run any ready tasks. - * - * @param now the current time - */ - public void poll(long now) { - while (!tasks.isEmpty() && tasks.peek().timeout <= now) { - Entry entry = tasks.poll(); - entry.task.run(now); - } - } - - private static class Entry implements Comparable<Entry> { - DelayedTask task; - long timeout; - - public Entry(DelayedTask task, long timeout) { - this.task = task; - this.timeout = timeout; - } - - @Override - public int compareTo(Entry entry) { - return Long.compare(timeout, entry.timeout); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 913ce9e..84278c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -65,6 +65,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; /** * This class manage the fetching process with the brokers. @@ -84,7 +85,7 @@ public class Fetcher<K, V> { private final Metadata metadata; private final FetchManagerMetrics sensors; private final SubscriptionState subscriptions; - private final List<CompletedFetch> completedFetches; + private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; @@ -115,7 +116,7 @@ public class Fetcher<K, V> { this.checkCrcs = checkCrcs; this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; - this.completedFetches = new ArrayList<>(); + this.completedFetches = new ConcurrentLinkedQueue<>(); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; } @@ -127,7 +128,8 @@ public class Fetcher<K, V> { public void sendFetches() { for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) { final FetchRequest request = fetchEntry.getValue(); - client.send(fetchEntry.getKey(), ApiKeys.FETCH, request) + final Node fetchTarget = fetchEntry.getKey(); + client.send(fetchTarget, ApiKeys.FETCH, request) .addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { @@ -148,7 +150,7 @@ public class Fetcher<K, V> { @Override public void onFailure(RuntimeException e) { - log.debug("Fetch failed", e); + log.debug("Fetch request to {} failed", fetchTarget, e); } }); } @@ -353,16 +355,14 @@ public class Fetcher<K, V> { } else { Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); int recordsRemaining = maxPollRecords; - Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator(); while (recordsRemaining > 0) { if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { - if (!completedFetchesIterator.hasNext()) + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) break; - CompletedFetch completion = completedFetchesIterator.next(); - completedFetchesIterator.remove(); - nextInLineRecords = parseFetchedData(completion); + nextInLineRecords = parseFetchedData(completedFetch); } else { recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); } @@ -510,6 +510,8 @@ public class Fetcher<K, V> { long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); log.trace("Added fetch request for partition {} at offset {}", partition, position); + } else { + log.trace("Skipping fetch for partition {} because there is an inflight request to {}", partition, node); } } @@ -845,4 +847,5 @@ public class Fetcher<K, V> { recordsFetched.record(records); } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java index 79e17e2..dff1006 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -16,26 +16,41 @@ package org.apache.kafka.clients.consumer.internals; * A helper class for managing the heartbeat to the coordinator */ public final class Heartbeat { - private final long timeout; - private final long interval; + private final long sessionTimeout; + private final long heartbeatInterval; + private final long maxPollInterval; + private final long retryBackoffMs; - private long lastHeartbeatSend; + private volatile long lastHeartbeatSend; // volatile since it is read by metrics private long lastHeartbeatReceive; private long lastSessionReset; + private long lastPoll; + private boolean heartbeatFailed; - public Heartbeat(long timeout, - long interval, - long now) { - if (interval >= timeout) + public Heartbeat(long sessionTimeout, + long heartbeatInterval, + long maxPollInterval, + long retryBackoffMs) { + if (heartbeatInterval >= sessionTimeout) throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout"); - this.timeout = timeout; - this.interval = interval; - this.lastSessionReset = now; + this.sessionTimeout = sessionTimeout; + this.heartbeatInterval = heartbeatInterval; + this.maxPollInterval = maxPollInterval; + this.retryBackoffMs = retryBackoffMs; + } + + public void poll(long now) { + this.lastPoll = now; } public void sentHeartbeat(long now) { this.lastHeartbeatSend = now; + this.heartbeatFailed = false; + } + + public void failHeartbeat() { + this.heartbeatFailed = true; } public void receiveHeartbeat(long now) { @@ -52,23 +67,34 @@ public final class Heartbeat { public long timeToNextHeartbeat(long now) { long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset); + final long delayToNextHeartbeat; + if (heartbeatFailed) + delayToNextHeartbeat = retryBackoffMs; + else + delayToNextHeartbeat = heartbeatInterval; - if (timeSinceLastHeartbeat > interval) + if (timeSinceLastHeartbeat > delayToNextHeartbeat) return 0; else - return interval - timeSinceLastHeartbeat; + return delayToNextHeartbeat - timeSinceLastHeartbeat; } public boolean sessionTimeoutExpired(long now) { - return now - Math.max(lastSessionReset, lastHeartbeatReceive) > timeout; + return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout; } public long interval() { - return interval; + return heartbeatInterval; } - public void resetSessionTimeout(long now) { + public void resetTimeouts(long now) { this.lastSessionReset = now; + this.lastPoll = now; + this.heartbeatFailed = false; + } + + public boolean pollTimeoutExpired(long now) { + return now - lastPoll > maxPollInterval; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java index 71c16fa..b21d13e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java @@ -196,7 +196,7 @@ public class RequestFuture<T> { } public static RequestFuture<Void> voidSuccess() { - RequestFuture<Void> future = new RequestFuture<Void>(); + RequestFuture<Void> future = new RequestFuture<>(); future.complete(null); return future; } http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d27ec8a..313477f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -572,9 +572,28 @@ public class Protocol { new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols that the member supports")); + public static final Schema JOIN_GROUP_REQUEST_V1 = new Schema(new Field("group_id", + STRING, + "The group id."), + new Field("session_timeout", + INT32, + "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."), + new Field("rebalance_timeout", + INT32, + "The maximum time that the coordinator will wait for each member to rejoin when rebalancing the group"), + new Field("member_id", + STRING, + "The assigned consumer id or an empty string for a new consumer."), + new Field("protocol_type", + STRING, + "Unique name for class of protocols implemented by group"), + new Field("group_protocols", + new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), + "List of protocols that the member supports")); public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING), new Field("member_metadata", BYTES)); + public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16), new Field("generation_id", INT32, @@ -591,8 +610,10 @@ public class Protocol { new Field("members", new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); - public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0}; - public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0}; + public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; + + public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1}; + public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1}; /* SyncGroup api */ public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING), http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 14a6c1d..2845ee0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -24,10 +24,11 @@ import java.util.Collections; import java.util.List; public class JoinGroupRequest extends AbstractRequest { - + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout"; + private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout"; private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols"; @@ -38,6 +39,7 @@ public class JoinGroupRequest extends AbstractRequest { private final String groupId; private final int sessionTimeout; + private final int rebalanceTimeout; private final String memberId; private final String protocolType; private final List<ProtocolMetadata> groupProtocols; @@ -60,14 +62,40 @@ public class JoinGroupRequest extends AbstractRequest { } } + // v0 constructor + @Deprecated + public JoinGroupRequest(String groupId, + int sessionTimeout, + String memberId, + String protocolType, + List<ProtocolMetadata> groupProtocols) { + this(0, groupId, sessionTimeout, sessionTimeout, memberId, protocolType, groupProtocols); + } + public JoinGroupRequest(String groupId, int sessionTimeout, + int rebalanceTimeout, String memberId, String protocolType, List<ProtocolMetadata> groupProtocols) { - super(new Struct(CURRENT_SCHEMA)); + this(1, groupId, sessionTimeout, rebalanceTimeout, memberId, protocolType, groupProtocols); + } + + private JoinGroupRequest(int version, + String groupId, + int sessionTimeout, + int rebalanceTimeout, + String memberId, + String protocolType, + List<ProtocolMetadata> groupProtocols) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.JOIN_GROUP.id, version))); + struct.set(GROUP_ID_KEY_NAME, groupId); struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); + + if (version >= 1) + struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout); + struct.set(MEMBER_ID_KEY_NAME, memberId); struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); @@ -82,6 +110,7 @@ public class JoinGroupRequest extends AbstractRequest { struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray()); this.groupId = groupId; this.sessionTimeout = sessionTimeout; + this.rebalanceTimeout = rebalanceTimeout; this.memberId = memberId; this.protocolType = protocolType; this.groupProtocols = groupProtocols; @@ -89,8 +118,17 @@ public class JoinGroupRequest extends AbstractRequest { public JoinGroupRequest(Struct struct) { super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); + + if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME)) + // rebalance timeout is added in v1 + rebalanceTimeout = struct.getInt(REBALANCE_TIMEOUT_KEY_NAME); + else + // v0 had no rebalance timeout but used session timeout implicitly + rebalanceTimeout = sessionTimeout; + memberId = struct.getString(MEMBER_ID_KEY_NAME); protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME); @@ -107,13 +145,16 @@ public class JoinGroupRequest extends AbstractRequest { public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { switch (versionId) { case 0: + case 1: return new JoinGroupResponse( + versionId, Errors.forException(e).code(), JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupResponse.UNKNOWN_PROTOCOL, JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId Collections.<String, ByteBuffer>emptyMap()); + default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id))); @@ -128,6 +169,10 @@ public class JoinGroupRequest extends AbstractRequest { return sessionTimeout; } + public int rebalanceTimeout() { + return rebalanceTimeout; + } + public String memberId() { return memberId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index dd829ed..8895ace 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -24,7 +24,8 @@ import java.util.List; import java.util.Map; public class JoinGroupResponse extends AbstractRequestResponse { - + + private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id); private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -65,7 +66,17 @@ public class JoinGroupResponse extends AbstractRequestResponse { String memberId, String leaderId, Map<String, ByteBuffer> groupMembers) { - super(new Struct(CURRENT_SCHEMA)); + this(CURRENT_VERSION, errorCode, generationId, groupProtocol, memberId, leaderId, groupMembers); + } + + public JoinGroupResponse(int version, + short errorCode, + int generationId, + String groupProtocol, + String memberId, + String leaderId, + Map<String, ByteBuffer> groupMembers) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version))); struct.set(ERROR_CODE_KEY_NAME, errorCode); struct.set(GENERATION_ID_KEY_NAME, generationId); http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index a76f48e..6cf93a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -50,8 +50,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse { * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 * GROUP_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_GROUP (16) - * ILLEGAL_GENERATION (22) - * UNKNOWN_MEMBER_ID (25) * TOPIC_AUTHORIZATION_FAILED (29) * GROUP_AUTHORIZATION_FAILED (30) */ http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8b52664..8d2ac00 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -90,8 +90,7 @@ public class KafkaConsumerTest { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { - KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>( - props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); @@ -314,17 +313,17 @@ public class KafkaConsumerTest { props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - return new KafkaConsumer<byte[], byte[]>( - props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } @Test - public void verifyHeartbeatSent() { + public void verifyHeartbeatSent() throws Exception { String topic = "topic"; TopicPartition partition = new TopicPartition(topic, 0); + int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; - int heartbeatIntervalMs = 3000; + int heartbeatIntervalMs = 1000; int autoCommitIntervalMs = 10000; Time time = new MockTime(); @@ -337,7 +336,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override @@ -370,9 +369,6 @@ public class KafkaConsumerTest { consumer.poll(0); assertEquals(Collections.singleton(partition), consumer.assignment()); - // heartbeat interval is 2 seconds - time.sleep(heartbeatIntervalMs); - final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); client.prepareResponseFrom(new MockClient.RequestMatcher() { @Override @@ -382,18 +378,23 @@ public class KafkaConsumerTest { } }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator); + // heartbeat interval is 2 seconds + time.sleep(heartbeatIntervalMs); + Thread.sleep(heartbeatIntervalMs); + consumer.poll(0); assertTrue(heartbeatReceived.get()); } @Test - public void verifyHeartbeatSentWhenFetchedDataReady() { + public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { String topic = "topic"; TopicPartition partition = new TopicPartition(topic, 0); + int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; - int heartbeatIntervalMs = 3000; + int heartbeatIntervalMs = 1000; int autoCommitIntervalMs = 10000; Time time = new MockTime(); @@ -406,7 +407,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { @@ -438,8 +439,6 @@ public class KafkaConsumerTest { client.respondFrom(fetchResponse(partition, 0, 5), node); client.poll(0, time.milliseconds()); - time.sleep(heartbeatIntervalMs); - client.prepareResponseFrom(fetchResponse(partition, 5, 0), node); final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); client.prepareResponseFrom(new MockClient.RequestMatcher() { @@ -450,6 +449,9 @@ public class KafkaConsumerTest { } }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator); + time.sleep(heartbeatIntervalMs); + Thread.sleep(heartbeatIntervalMs); + consumer.poll(0); assertTrue(heartbeatReceived.get()); @@ -459,6 +461,7 @@ public class KafkaConsumerTest { public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { String topic = "topic"; final TopicPartition partition = new TopicPartition(topic, 0); + int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 3000; int heartbeatIntervalMs = 2000; int autoCommitIntervalMs = 1000; @@ -473,7 +476,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.assign(Arrays.asList(partition)); consumer.seekToBeginning(Arrays.asList(partition)); @@ -496,6 +499,7 @@ public class KafkaConsumerTest { long offset1 = 10000; long offset2 = 20000; + int rebalanceTimeoutMs = 6000; int sessionTimeoutMs = 3000; int heartbeatIntervalMs = 2000; int autoCommitIntervalMs = 1000; @@ -510,7 +514,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.assign(Arrays.asList(partition1)); // lookup coordinator @@ -541,6 +545,7 @@ public class KafkaConsumerTest { String topic = "topic"; final TopicPartition partition = new TopicPartition(topic, 0); + int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; @@ -558,7 +563,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { @@ -619,6 +624,7 @@ public class KafkaConsumerTest { String topic = "topic"; final TopicPartition partition = new TopicPartition(topic, 0); + int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; @@ -636,7 +642,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, - sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { @@ -725,6 +731,7 @@ public class KafkaConsumerTest { KafkaClient client, Metadata metadata, PartitionAssignor assignor, + int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, int autoCommitIntervalMs) { @@ -757,6 +764,7 @@ public class KafkaConsumerTest { ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator( consumerClient, groupId, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, assignors, @@ -800,6 +808,9 @@ public class KafkaConsumerTest { metrics, subscriptions, metadata, + autoCommitEnabled, + autoCommitIntervalMs, + heartbeatIntervalMs, retryBackoffMs, requestTimeoutMs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 7a05eb1..77f9df5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -16,15 +16,20 @@ **/ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.GroupCoordinatorResponse; +import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; @@ -37,12 +42,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AbstractCoordinatorTest { private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]); - private static final int SESSION_TIMEOUT_MS = 30000; + private static final int REBALANCE_TIMEOUT_MS = 60000; + private static final int SESSION_TIMEOUT_MS = 10000; private static final int HEARTBEAT_INTERVAL_MS = 3000; private static final long RETRY_BACKOFF_MS = 100; private static final long REQUEST_TIMEOUT_MS = 40000; @@ -77,8 +85,8 @@ public class AbstractCoordinatorTest { @Test public void testCoordinatorDiscoveryBackoff() { - mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); // blackout the coordinator for 50 milliseconds to simulate a disconnect. // after backing off, we should be able to connect. @@ -91,17 +99,65 @@ public class AbstractCoordinatorTest { assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); } - private Struct groupCoordinatorResponse(Node node, short error) { - GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); + @Test + public void testUncaughtExceptionInHeartbeatThread() throws Exception { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE)); + mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); + + + final RuntimeException e = new RuntimeException(); + + // raise the error when the background thread tries to send a heartbeat + mockClient.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + if (request.request().header().apiKey() == ApiKeys.HEARTBEAT.id) + throw e; + return false; + } + }, heartbeatResponse(Errors.UNKNOWN)); + + try { + coordinator.ensureActiveGroup(); + mockTime.sleep(HEARTBEAT_INTERVAL_MS); + synchronized (coordinator) { + coordinator.notify(); + } + Thread.sleep(100); + + coordinator.pollHeartbeat(mockTime.milliseconds()); + fail("Expected pollHeartbeat to raise an error"); + } catch (RuntimeException exception) { + assertEquals(exception, e); + } + } + + private Struct groupCoordinatorResponse(Node node, Errors error) { + GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node); return response.toStruct(); } + private Struct heartbeatResponse(Errors error) { + HeartbeatResponse response = new HeartbeatResponse(error.code()); + return response.toStruct(); + } + + private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) { + return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId, + Collections.<String, ByteBuffer>emptyMap()).toStruct(); + } + + private Struct syncGroupResponse(Errors error) { + return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)).toStruct(); + } + public class DummyCoordinator extends AbstractCoordinator { public DummyCoordinator(ConsumerNetworkClient client, Metrics metrics, Time time) { - super(client, GROUP_ID, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, + super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS); }
