Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl Thanks for the explanation. It makes more sense now.
    
    I don't think we can meaningfully handle the case where the user has 
specified no cap on retries and one or more tuples keep failing.
    
    In the situation you describe, `maxUncommittedOffsets` isn't really solving 
the problem. If offset 3 fails and all other tuples can be acked, the spout 
will emit another `maxUncommittedOffsets` tuples on the same partition as 3, 
and will then stop emitting because it can't commit any tuples until 3 is 
acked. At this point, only offset 3 will get re-emitted until it hopefully 
succeeds at some point, so having a continually failing tuple (or just one that 
fails many times) will still "clog" the spout.
    
    If `maxUncommittedOffsets` is removed, you're right that the `acked` map 
size is technically uncapped, but given the amount of information we store, and 
how the spout handles retries, I think it is a non-issue. Just to illustrate, 
say we remove `maxUncommittedOffsets`, and let's say again that offset 3 is 
failing a large number of times, and the last acked offset is far beyond it, 
e.g. 5000000. When 3 is ready for retry, the consumer is reset to that position 
on the relevant partition. This means it fetches and emits message 3. It will 
also have to fetch 4...5000000 (potentially over many calls to `poll()`), 
because we never seek forward to where we left off). The spout therefore has to 
spend time fetching and discarding all tuples up to 5000000 before it can 
finally emit another tuple. It seems likely that it'll hit the point where it 
is spending all the time fetching and discarding acked tuples earlier than 
it'll run out of memory to store their offsets.
    
    Disregarding the issue with consumer seeking, because my reasoning relies 
on an implementation detail of how we do retries, I'm still not sure 
`maxUncommittedOffsets` is allowing higher throughput compared to not having 
it. If we allow the spout to fail with an `OutOfMemoryError`, the spout will 
have had higher throughput up to the crash than if it were being throttled by 
`maxUncommittedOffsets` (because otherwise it would also have had an OOME in 
that case). It really seems to me like all `maxUncommittedOffsets` is doing is 
trading having the spout potentially cause an OOME due to `acked` size, in 
exchange for making the spout react to the same situation by not emitting any 
more tuples. I'm not sure that is better, because data flow stops in either 
case. 
    
    `maxUncommittedOffsets` could have some value in a system where the 
`KafkaSpout` is a secondary stream source and the messages coming out of it 
aren't time sensitive. In that case it might be fine to let the `KafkaSpout` 
stop emitting tuples for a while if some tuples temporarily can't be acked, but 
having it cause an OOME would be too disruptive because the primary spout could 
be doing fine. I can't come up with a concrete example of this kind of 
configuration though.
    
    I'd be fine with keeping `maxUncommittedOffsets` for that kind of situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to