Repository: storm Updated Branches: refs/heads/0.10.x-branch 0315b015d -> 05bd1e366
STORM-643 fixing based on master branch also add UT Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05bd1e36 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05bd1e36 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05bd1e36 Branch: refs/heads/0.10.x-branch Commit: 05bd1e36697fe7038284833032da262afb82ac99 Parents: 0315b01 Author: Xin Wang <best.wang...@163.com> Authored: Sat Jul 18 11:39:51 2015 +0800 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Tue Jul 21 00:26:12 2015 +0900 ---------------------------------------------------------------------- .../ExponentialBackoffMsgRetryManager.java | 21 ++++++++++++++-- .../jvm/storm/kafka/FailedMsgRetryManager.java | 3 +++ .../src/jvm/storm/kafka/PartitionManager.java | 12 +++++++++ .../ExponentialBackoffMsgRetryManagerTest.java | 26 +++++++++++++++++++- 4 files changed, 59 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/05bd1e36/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java index 8c0bbe1..0d74fb8 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java @@ -18,10 +18,12 @@ package storm.kafka; import java.util.Comparator; -import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager { @@ -30,7 +32,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager private final long retryDelayMaxMs; private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); - private Map<Long,MessageRetryRecord> records = new HashMap<Long,MessageRetryRecord>(); + private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>(); public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) { this.retryInitialDelayMs = retryInitialDelayMs; @@ -91,6 +93,21 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager System.currentTimeMillis() >= record.retryTimeUTC; } + @Override + public Set<Long> clearInvalidMessages(Long kafkaOffset) { + Set<Long> invalidOffsets = new HashSet<Long>(); + for(Long offset : records.keySet()){ + if(offset < kafkaOffset){ + MessageRetryRecord record = this.records.remove(offset); + if (record != null) { + this.waiting.remove(record); + invalidOffsets.add(offset); + } + } + } + return invalidOffsets; + } + /** * A MessageRetryRecord holds the data of how many times a message has * failed and been retried, and when the last failure occurred. It can http://git-wip-us.apache.org/repos/asf/storm/blob/05bd1e36/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java index 3f0e117..30c9a24 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java @@ -17,10 +17,13 @@ */ package storm.kafka; +import java.util.Set; + public interface FailedMsgRetryManager { public void failed(Long offset); public void acked(Long offset); public void retryStarted(Long offset); public Long nextFailedMessageToRetry(); public boolean shouldRetryMsg(Long offset); + public Set<Long> clearInvalidMessages(Long kafkaOffset); } http://git-wip-us.apache.org/repos/asf/storm/blob/05bd1e36/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index a7ed879..ce18677 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -171,6 +171,18 @@ public class PartitionManager { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); LOG.warn("Using new offset: {}", _emittedToOffset); // fetch failed, so don't update the metrics + + //fix bug [STORM-643] : remove outdated failed offsets + if (!processingNewTuples) { + // For the case of EarliestTime it would be better to discard + // all the failed offsets, that are earlier than actual EarliestTime + // offset, since they are anyway not there. + // These calls to broker API will be then saved. + Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset); + + LOG.warn("Removing the failed offsets that are out of range: {}", omitted); + } + return; } long end = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/storm/blob/05bd1e36/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java index 3dd8b38..11ad5b7 100644 --- a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java @@ -29,6 +29,8 @@ public class ExponentialBackoffMsgRetryManagerTest { private static final Long TEST_OFFSET = 101L; private static final Long TEST_OFFSET2 = 102L; + private static final Long TEST_OFFSET3 = 105L; + private static final Long TEST_NEW_OFFSET = 103L; @Test public void testImmediateRetry() throws Exception { @@ -208,4 +210,26 @@ public class ExponentialBackoffMsgRetryManagerTest { assertEquals("expect test offset next available for retry", TEST_OFFSET, next); assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); } -} \ No newline at end of file + + @Test + public void testClearInvalidMessages() throws Exception { + ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + manager.failed(TEST_OFFSET); + manager.failed(TEST_OFFSET2); + manager.failed(TEST_OFFSET3); + + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); + assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3)); + + manager.clearInvalidMessages(TEST_NEW_OFFSET); + + Long next = manager.nextFailedMessageToRetry(); + assertEquals("expect test offset next available for retry", TEST_OFFSET3, next); + + manager.acked(TEST_OFFSET3); + next = manager.nextFailedMessageToRetry(); + assertNull("expect no message ready after acked", next); + } + +}