Repository: kafka Updated Branches: refs/heads/trunk 916edc3a4 -> dcea49856
KAFKA-4777; Backoff properly in consumer heartbeat thread if no brokers are available Author: Allen Xiang <allen.xi...@monsanto.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2564 from allenxiang/client-heartbeat-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dcea4985 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dcea4985 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dcea4985 Branch: refs/heads/trunk Commit: dcea49856805a039f0859facf169a87a574c06d3 Parents: 916edc3 Author: Allen Xiang <allen.xi...@monsanto.com> Authored: Sat Feb 18 09:25:28 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Sat Feb 18 09:25:28 2017 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/AbstractCoordinator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dcea4985/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 350a84b..1c2d607 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -888,9 +888,9 @@ public abstract class AbstractCoordinator implements Closeable { long now = time.milliseconds(); if (coordinatorUnknown()) { - if (findCoordinatorFuture == null) - lookupCoordinator(); - else + if (findCoordinatorFuture != null || lookupCoordinator().failed()) + // the immediate future check ensures that we backoff properly in the case that no + // brokers are available to connect to. AbstractCoordinator.this.wait(retryBackoffMs); } else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should @@ -941,7 +941,7 @@ public abstract class AbstractCoordinator implements Closeable { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { - log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); + log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } finally { log.debug("Heartbeat thread for group {} has closed", groupId);