Github user pershyn commented on the pull request:

    https://github.com/apache/storm/pull/405#issuecomment-75567762
  
    Hi Xin,
    
    Thank you for the good finding and patch.
    
    I have applied proposed patch on pure storm-0.9.3 branch, deployed and 
testet, and indeed it seems to fix the issue by skipping failed offsets. 
    But, in case there are lots of failed messages that have outdated offset -> 
the spout will slowly ask kafka for each of them and then discarding filed 
message one at a time.
    
    In logs this will look like following: (some log warnings records were 
added for debugging reasons).
    Note, that "new offset" doesn't change.
    
    ```
    2015-02-23T14:19:30.080+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094410182]
    2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
    2015-02-23T14:19:30.083+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094410182
    2015-02-23T14:19:30.186+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094410183]
    2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
    2015-02-23T14:19:30.189+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094410183
    2015-02-23T14:19:30.291+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094411596]
    2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Using new offset: 
58094849835
    2015-02-23T14:19:30.293+0100 s.k.PartitionManager [WARN] Removing the 
failed offset that is out of range: 58094411596
    2015-02-23T14:19:30.396+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [58094411837]
    ```
    
    So, I have added some logic to skip all the outdated failed offsets in one 
step, see code snippet below:
    
    ```
                //fix bug [STORM-643] : remove this offset from failed list 
when it is OutOfRange
                if (had_failed) {
                    // For the case of EarliestTime it would be better to 
discard
                    // all the failed offsets, that are earlier than actual 
EarliestTime
                    // offset, since they are anyway not there.
                    // These calls to broker API will be then saved.
    
                    // In case of LatestTime - it is a question, if we still 
need to try out and
                    // reach those that are failed (they still may be 
available).
                    // But, by moving to LatestTime we are discarding messages 
in kafka queue.
                    // Since it is configured so, assume that it is ok for user 
to loose information
                    // and user cares about newest messages first.
                    // It makes sense not to do exceptions for those that are 
failed and discard them as well.
    
                    SortedSet<Long> omitted = failed.headSet(_emittedToOffset);
    
                    // Use tail, since sortedSet maintains its elements in 
ascending order
                    // Using tailSet will set a 'range' on original 
implementation
                    // so we couldn't then add objects that are out of range.
                    // For that reason we copy tail into new Set, where range 
is not set.
                    failed = new 
TreeSet<Long>(failed.tailSet(_emittedToOffset));
                    LOG.warn("Removing the failed offsets that are out of 
range: {}", omitted);
                }
    ```
    
    So, then outdated offsets are skipped at once, and we save several 
(hundreds) calls to kafka API:
    
    ```
    2015-02-23T15:07:21.913+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [8786892024]
    2015-02-23T15:07:21.915+0100 s.k.PartitionManager [WARN] Using new offset: 
8789372723
    2015-02-23T15:07:21.916+0100 s.k.PartitionManager [WARN] Removing the 
failed offsets that are out of range: [8786892024, 8786892114, 8786892125, 
8786892127, 8786892170, 8786892207, 8786892217, 8786892317, 8786892405, 
8786892444, 8786892453, 8786892469, 8786892478, 8786892549, 8786892614, 
8786892667, 8786892918, /* ... some omitted ... */ 8786977572, 8786977944, 
8786986501, 8786991794, 8786991797, 8786994497, 8787001536]
    2015-02-23T15:07:32.759+0100 s.k.ZkCoordinator [INFO] Task [7/8] Refreshing 
partition manager connections
    ```
    
    If you are agree with such improvement I would be happy if you extend the 
pull request with this logic.
    
    Best regards and Thanks,
    Michael


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to