STORM-1705: Update storm-kafka readme
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c706afc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c706afc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c706afc Branch: refs/heads/1.x-branch Commit: 5c706afcb0922b891f1eda0f5483c89272491d12 Parents: 0f1aec5 Author: Abhishek Agarwal <[email protected]> Authored: Thu Jun 9 18:36:49 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Thu Jun 9 18:36:49 2016 +0530 ---------------------------------------------------------------------- external/storm-kafka/README.md | 55 ++++++++++++++++++-- .../ExponentialBackoffMsgRetryManager.java | 3 +- .../storm/kafka/FailedMsgRetryManager.java | 3 +- .../apache/storm/kafka/PartitionManager.java | 3 +- .../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++-- .../ExponentialBackoffMsgRetryManagerTest.java | 2 +- 6 files changed, 64 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/README.md ---------------------------------------------------------------------- diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 5a34b55..91a6553 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -63,13 +63,19 @@ In addition to these parameters, SpoutConfig contains the following fields that // setting for how often to save the current Kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; - // Exponential back-off retry settings. These are used when retrying messages after a bolt - // calls OutputCollector.fail(). - // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent - // resubmitting the message while still retrying. + // Retry strategy for failed messages + public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); + + // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt + // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used. + // Initial delay between successive retries public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; + + // Maximum delay between successive retries public long retryDelayMaxMs = 60 * 1000; + // Failed message will be retried infinitely if retryLimit is less than zero. + public int retryLimit = -1; ``` Core KafkaSpout only accepts an instance of SpoutConfig. @@ -113,6 +119,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message. +###Failed message retry +FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays +between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname +of implementation. Here is the interface + +```java + // Spout initialization can go here. This can be called multiple times during lifecycle of a worker. + void prepare(SpoutConfig spoutConfig, Map stormConf); + + // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset. + void failed(Long offset); + + // Message corresponding to offset has been acked. + void acked(Long offset); + + // Message corresponding to the offset, has been re-emitted and under transit. + void retryStarted(Long offset); + + /** + * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset + * and resend them, except completed messages. + */ + Long nextFailedMessageToRetry(); + + /** + * @return True if the message corresponding to the offset should be emitted NOW. False otherwise. + */ + boolean shouldReEmitMsg(Long offset); + + /** + * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true, + * spout will called failed(offset) in next call and acked(offset) otherwise + */ + boolean retryFurther(Long offset); + + /** + * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka. + */ + Set<Long> clearOffsetsBefore(Long kafkaOffset); +``` + #### Version incompatibility In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[]. http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java index 7b5f5dd..b2cfaf0 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java @@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager } - public void prepare(SpoutConfig spoutConfig) { + public void prepare(SpoutConfig spoutConfig, Map stormConf) { this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs; this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier; this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs; @@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager public boolean retryFurther(Long offset) { MessageRetryRecord record = this.records.get(offset); return ! (record != null && + this.retryLimit > 0 && this.waiting.contains(record) && this.retryLimit <= record.retryNum); } http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java index 9a3b19f..5e2cc5f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java @@ -18,6 +18,7 @@ package org.apache.storm.kafka; import java.io.Serializable; +import java.util.Map; import java.util.Set; public interface FailedMsgRetryManager extends Serializable { @@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable { /** * Initialization */ - void prepare(SpoutConfig spoutConfig); + void prepare(SpoutConfig spoutConfig, Map stormConf); /** * Message corresponding to the offset failed in kafka spout. http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index d6e51b7..b65f053 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -77,7 +77,7 @@ public class PartitionManager { try { _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); - _failedMsgRetryManager.prepare(spoutConfig); + _failedMsgRetryManager.prepare(spoutConfig, _stormConf); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", FailedMsgRetryManager.class, @@ -284,6 +284,7 @@ public class PartitionManager { this._failedMsgRetryManager.failed(offset); } else { // state for the offset should be cleaned up + LOG.warn("Will not retry failed kafka offset {} further", offset); _messageIneligibleForRetryCount.incr(); _pending.remove(offset); this._failedMsgRetryManager.acked(offset); http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java index 415ce0a..aa93c24 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable { // setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; - // Exponential back-off retry settings. These are used when retrying messages after a bolt - // calls OutputCollector.fail(). + // Retry strategy for failed messages + public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); + + // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt + // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used. public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; - public int retryLimit = Integer.MAX_VALUE; - public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); + public int retryLimit = -1; public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java index f2815e2..2f53612 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java @@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest { spoutConfig.retryDelayMaxMs = retryDelayMaxMs; spoutConfig.retryLimit = retryLimit; ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager(); - exponentialBackoffMsgRetryManager.prepare(spoutConfig); + exponentialBackoffMsgRetryManager.prepare(spoutConfig, null); return exponentialBackoffMsgRetryManager; } }
