[ 
https://issues.apache.org/jira/browse/STORM-495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14129389#comment-14129389
 ] 

ASF GitHub Bot commented on STORM-495:
--------------------------------------

Github user rick-kilgore commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/254#discussion_r17396897
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
    @@ -191,6 +208,7 @@ public void ack(Long offset) {
                 _pending.headSet(offset - 
_spoutConfig.maxOffsetBehind).clear();
             }
             _pending.remove(offset);
    +        retryRecords.remove(offset);
    --- End diff --
    
    I did notice this, but chose to leave it.  My understanding is that the 
topology is expected to eventually ack() all messages.  And if I understand 
right, if a worker JVM or host crashes, then the message will eventually 
timeout due to the topology.message.timeout.secs setting in 
backtype.storm.Config, and it will then be re-submitted to the topology and the 
new worker should ack() it at that point - since the new worker will still use 
the same kafka offset  for the key in my retryRecords map that the crashed 
worker was using.
    
    I'm not 100% sure that covers everything, but as I have it, it's also 
working the same way cleanup of the _pending collection has been working and 
still is.  Also, I wonder if it is a good thing that there will be a visible 
problem if some topology is failing to ack its messages - in the form of 
running out of memory in this case.


> Add delayed retries to KafkaSpout
> ---------------------------------
>
>                 Key: STORM-495
>                 URL: https://issues.apache.org/jira/browse/STORM-495
>             Project: Apache Storm (Incubating)
>          Issue Type: Improvement
>    Affects Versions: 0.9.3-incubating
>         Environment: all environments
>            Reporter: Rick Kilgore
>            Priority: Minor
>              Labels: kafka, retry
>
> If a tuple in the topology originates from the KafkaSpout from the 
> external/storm-kafka sources, and if a bolt in the topology indicates a 
> failure by calling fail() on its OutputCollector, the KafkaSpout will 
> immediately retry the message.
> We wish to use this failure and retry behavior in our ingestion system 
> whenever we experience a recoverable error from a downstream system, such as 
> a 500 or 503 error from a service we depend on.  But with the current 
> KafkaSpout behavior, doing so results in a tight loop where we retry several 
> times over a few seconds and then give up.  I want to be able to delay retry 
> to give the downstream service some time to recover.  Ideally, I would like 
> to have configurable, exponential backoff retry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to