Github user pershyn commented on the pull request: https://github.com/apache/storm/pull/405#issuecomment-75567762 Hi Xin, Thank you for the good finding and patch. I have applied proposed patch on pure storm-0.9.3 branch, deployed and testet, and indeed it seems to fix the issue by skipping failed offsets. But, in case there are lots of failed messages that have outdated offset -> the spout will slowly ask kafka for each of them and then discarding filed message one at a time. In logs this will look like following: (some log warnings records were added for debugging reasons). Note, that "new offset" doesn't change. ``` 2015-02-23T14:19:30.080+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094410182] 2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094410182 2015-02-23T14:19:30.186+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094410183] 2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094410183 2015-02-23T14:19:30.291+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094411596] 2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Using new offset: 58094849835 2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Removing the failed offset that is out of range: 58094411596 2015-02-23T14:19:30.396+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [58094411837] ``` So, I have added some logic to skip all the outdated failed offsets in one step, see code snippet below: ``` //fix bug [STORM-643] : remove this offset from failed list when it is OutOfRange if (had_failed) { // For the case of EarliestTime it would be better to discard // all the failed offsets, that are earlier than actual EarliestTime // offset, since they are anyway not there. // These calls to broker API will be then saved. // In case of LatestTime - it is a question, if we still need to try out and // reach those that are failed (they still may be available). // But, by moving to LatestTime we are discarding messages in kafka queue. // Since it is configured so, assume that it is ok for user to loose information // and user cares about newest messages first. // It makes sense not to do exceptions for those that are failed and discard them as well. SortedSet<Long> omitted = failed.headSet(_emittedToOffset); // Use tail, since sortedSet maintains its elements in ascending order // Using tailSet will set a 'range' on original implementation // so we couldn't then add objects that are out of range. // For that reason we copy tail into new Set, where range is not set. failed = new TreeSet<Long>(failed.tailSet(_emittedToOffset)); LOG.warn("Removing the failed offsets that are out of range: {}", omitted); } ``` So, then outdated offsets are skipped at once, and we save several (hundreds) calls to kafka API: ``` 2015-02-23T15:07:21.913+0100 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [8786892024] 2015-02-23T15:07:21.915+0100 s.k.PartitionManager [WARN] Using new offset: 8789372723 2015-02-23T15:07:21.916+0100 s.k.PartitionManager [WARN] Removing the failed offsets that are out of range: [8786892024, 8786892114, 8786892125, 8786892127, 8786892170, 8786892207, 8786892217, 8786892317, 8786892405, 8786892444, 8786892453, 8786892469, 8786892478, 8786892549, 8786892614, 8786892667, 8786892918, /* ... some omitted ... */ 8786977572, 8786977944, 8786986501, 8786991794, 8786991797, 8786994497, 8787001536] 2015-02-23T15:07:32.759+0100 s.k.ZkCoordinator [INFO] Task [7/8] Refreshing partition manager connections ``` If you are agree with such improvement I would be happy if you extend the pull request with this logic. Best regards and Thanks, Michael
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---