Repository: incubator-twill Updated Branches: refs/heads/master 388a6d922 -> b740da438
(TWILL-161) Added back off logic to SimplyKafkaConsumer - Avoid excessive amount of polling and logs in case of failure This closes #76 on GitHub. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/b740da43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/b740da43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/b740da43 Branch: refs/heads/master Commit: b740da4385ec542268508882e10069832f5ec35c Parents: 388a6d9 Author: Terence Yim <[email protected]> Authored: Tue Jan 5 14:04:29 2016 -0800 Committer: Terence Yim <[email protected]> Committed: Tue Jan 5 15:40:33 2016 -0800 ---------------------------------------------------------------------- .../kafka/client/SimpleKafkaConsumer.java | 48 +++++++++++--- .../apache/twill/kafka/client/KafkaTest.java | 68 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java index 230521c..8cfe889 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java @@ -72,7 +72,8 @@ final class SimpleKafkaConsumer implements KafkaConsumer { private static final int SO_TIMEOUT = 5 * 1000; // 5 seconds. private static final int MAX_WAIT = 1000; // 1 second. private static final long CONSUMER_EXPIRE_MINUTES = 1L; // close consumer if not used for 1 minute. - private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L; // Sleep for 2 seconds if failure in consumer. + private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer. + private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer. private static final long EMPTY_FETCH_WAIT = 500L; // Sleep for 500 ms if no message is fetched. private final BrokerService brokerService; @@ -328,16 +329,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer { final AtomicLong offset = new AtomicLong(startOffset); Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null; - + ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF, + MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS); while (running) { if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) { LOG.debug("No leader for topic partition {}.", topicPart); - try { - TimeUnit.MILLISECONDS.sleep(CONSUMER_FAILURE_RETRY_INTERVAL); - } catch (InterruptedException e) { - // OK to ignore this, as interrupt would be caused by thread termination. - LOG.trace("Consumer sleep interrupted.", e); - } + backoff.backoff(); continue; } @@ -375,10 +372,12 @@ final class SimpleKafkaConsumer implements KafkaConsumer { // Call the callback invokeCallback(messages, offset); + backoff.reset(); } catch (Throwable t) { if (running || !(t instanceof ClosedByInterruptException)) { // Only log if it is still running, otherwise, it just the interrupt caused by the stop. LOG.info("Exception when fetching message on {}.", topicPart, t); + backoff.backoff(); } consumers.refresh(consumerEntry.getKey()); consumerEntry = null; @@ -477,5 +476,38 @@ final class SimpleKafkaConsumer implements KafkaConsumer { } }; } + + /** + * Helper class for performance exponential backoff on message fetching failure. + */ + private final class ExponentialBackoff { + private final long initialBackoff; + private final long maxBackoff; + private final TimeUnit backoffUnit; + private int failureCount = 0; + + private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) { + this.initialBackoff = initialBackoff; + this.maxBackoff = maxBackoff; + this.backoffUnit = backoffUnit; + } + + void backoff() { + failureCount++; + long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1)); + long backoff = Math.min(initialBackoff * multiplier, maxBackoff); + backoff = backoff < 0 ? maxBackoff : backoff; + try { + backoffUnit.sleep(backoff); + } catch (InterruptedException e) { + // OK to ignore since this method is called from the consumer thread only, which on thread shutdown, + // the thread will be interrupted + } + } + + void reset() { + failureCount = 0; + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/b740da43/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---------------------------------------------------------------------- diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java index 93119ab..4ac8ae4 100644 --- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java +++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java @@ -42,8 +42,11 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** @@ -84,6 +87,71 @@ public class KafkaTest { } @Test + public void testKafkaClientReconnect() throws Exception { + String topic = "backoff"; + Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff"); + EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig); + + ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build(); + zkClient.startAndWait(); + try { + zkClient.create("/", null, CreateMode.PERSISTENT).get(); + + ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); + kafkaClient.startAndWait(); + + try { + server.startAndWait(); + try { + // Publish a messages + createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start(); + + // Creater a consumer + final BlockingQueue<String> queue = new LinkedBlockingQueue<>(); + Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0) + .consume(new KafkaConsumer.MessageCallback() { + @Override + public void onReceived(Iterator<FetchedMessage> messages) { + while (messages.hasNext()) { + queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString()); + } + } + + @Override + public void finished() { + } + }); + + // Wait for the first message + Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS)); + + // Shutdown the server + server.stopAndWait(); + + // Start the server again. + // Needs to create a new instance with the same config since guava service cannot be restarted + server = new EmbeddedKafkaServer(kafkServerConfig); + server.startAndWait(); + + // Publish another message + createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start(); + + // Should be able to get the second message + Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS)); + + cancel.cancel(); + } finally { + kafkaClient.stopAndWait(); + } + } finally { + server.stopAndWait(); + } + } finally { + zkClient.stopAndWait(); + } + } + + @Test public void testKafkaClient() throws Exception { String topic = "testClient";
