Repository: kafka Updated Branches: refs/heads/0.9.0 26f797931 -> 07e214130
KAFKA-2860: better handling of auto commit errors Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #553 from hachikuji/KAFKA-2860 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/07e21413 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/07e21413 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/07e21413 Branch: refs/heads/0.9.0 Commit: 07e214130fe7dc74cf2a5628232b8c3c3ef000bc Parents: 26f7979 Author: Jason Gustafson <[email protected]> Authored: Wed Nov 18 17:19:47 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 20 08:26:29 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 10 ++- .../consumer/internals/ConsumerCoordinator.java | 91 +++++++++++++++----- 2 files changed, 79 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/07e21413/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a12c6c1..ddaa728 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -242,12 +242,16 @@ public abstract class AbstractCoordinator implements Closeable { private class HeartbeatTask implements DelayedTask { + private boolean requestInFlight = false; + public void reset() { // start or restart the heartbeat task to be executed at the next chance long now = time.milliseconds(); heartbeat.resetSessionTimeout(now); client.unschedule(this); - client.schedule(this, now); + + if (!requestInFlight) + client.schedule(this, now); } @Override @@ -270,10 +274,13 @@ public abstract class AbstractCoordinator implements Closeable { client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); } else { heartbeat.sentHeartbeat(now); + requestInFlight = true; + RequestFuture<Void> future = sendHeartbeatRequest(); future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { + requestInFlight = false; long now = time.milliseconds(); heartbeat.receiveHeartbeat(now); long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); @@ -282,6 +289,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onFailure(RuntimeException e) { + requestInFlight = false; client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/07e21413/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ca08df0..93be7a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -69,7 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; - private DelayedTask autoCommitTask = null; + private final AutoCommitTask autoCommitTask; /** * Initialize the coordination manager. @@ -112,9 +112,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { addMetadataListener(); - if (autoCommitEnabled) - this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs); - + this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null; this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); } @@ -179,6 +177,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); + // restart the autocommit task if needed + if (autoCommitEnabled) + autoCommitTask.enable(); + // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); @@ -308,8 +310,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public void close() { client.disableWakeups(); try { - if (autoCommitTask != null) - client.unschedule(autoCommitTask); maybeAutoCommitOffsetsSync(); } finally { super.close(); @@ -361,25 +361,74 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - private DelayedTask scheduleAutoCommitTask(final long interval) { - DelayedTask task = new DelayedTask() { - public void run(long now) { - commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Auto offset commit failed.", exception); - } - }); - client.schedule(this, now + interval); + private class AutoCommitTask implements DelayedTask { + private final long interval; + private boolean enabled = false; + private boolean requestInFlight = false; + + public AutoCommitTask(long interval) { + this.interval = interval; + } + + public void enable() { + if (!enabled) { + // there shouldn't be any instances scheduled, but call unschedule anyway to ensure + // that this task is only ever scheduled once + client.unschedule(this); + this.enabled = true; + + if (!requestInFlight) { + long now = time.milliseconds(); + client.schedule(this, interval + now); + } + } + } + + public void disable() { + this.enabled = false; + client.unschedule(this); + } + + private void reschedule(long at) { + if (enabled) + client.schedule(this, at); + } + + public void run(final long now) { + if (!enabled) + return; + + if (coordinatorUnknown()) { + log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff"); + client.schedule(this, now + retryBackoffMs); + return; } - }; - client.schedule(task, time.milliseconds() + interval); - return task; + + requestInFlight = true; + commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + requestInFlight = false; + if (exception == null) { + reschedule(now + interval); + } else if (exception instanceof SendFailedException) { + log.debug("Failed to send automatic offset commit, will retry immediately"); + reschedule(now); + } else { + log.warn("Auto offset commit failed: {}", exception.getMessage()); + reschedule(now + interval); + } + } + }); + } } private void maybeAutoCommitOffsetsSync() { if (autoCommitEnabled) { + // disable periodic commits prior to committing synchronously. note that they will + // be re-enabled after a rebalance completes + autoCommitTask.disable(); + try { commitOffsetsSync(subscriptions.allConsumed()); } catch (WakeupException e) { @@ -387,7 +436,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.error("Auto offset commit failed.", e); + log.warn("Auto offset commit failed: ", e.getMessage()); } } }
