Github user srdo commented on the issue: https://github.com/apache/storm/pull/1924 @hmcl Thanks for the explanation. It makes more sense now. I don't think we can meaningfully handle the case where the user has specified no cap on retries and one or more tuples keep failing. In the situation you describe, `maxUncommittedOffsets` isn't really solving the problem. If offset 3 fails and all other tuples can be acked, the spout will emit another `maxUncommittedOffsets` tuples on the same partition as 3, and will then stop emitting because it can't commit any tuples until 3 is acked. At this point, only offset 3 will get re-emitted until it hopefully succeeds at some point, so having a continually failing tuple (or just one that fails many times) will still "clog" the spout. If `maxUncommittedOffsets` is removed, you're right that the `acked` map size is technically uncapped, but given the amount of information we store, and how the spout handles retries, I think it is a non-issue. Just to illustrate, say we remove `maxUncommittedOffsets`, and let's say again that offset 3 is failing a large number of times, and the last acked offset is far beyond it, e.g. 5000000. When 3 is ready for retry, the consumer is reset to that position on the relevant partition. This means it fetches and emits message 3. It will also have to fetch 4...5000000 (potentially over many calls to `poll()`), because we never seek forward to where we left off). The spout therefore has to spend time fetching and discarding all tuples up to 5000000 before it can finally emit another tuple. It seems likely that it'll hit the point where it is spending all the time fetching and discarding acked tuples earlier than it'll run out of memory to store their offsets. Disregarding the issue with consumer seeking, because my reasoning relies on an implementation detail of how we do retries, I'm still not sure `maxUncommittedOffsets` is allowing higher throughput compared to not having it. If we allow the spout to fail with an `OutOfMemoryError`, the spout will have had higher throughput up to the crash than if it were being throttled by `maxUncommittedOffsets` (because otherwise it would also have had an OOME in that case). It really seems to me like all `maxUncommittedOffsets` is doing is trading having the spout potentially cause an OOME due to `acked` size, in exchange for making the spout react to the same situation by not emitting any more tuples. I'm not sure that is better, because data flow stops in either case. `maxUncommittedOffsets` could have some value in a system where the `KafkaSpout` is a secondary stream source and the messages coming out of it aren't time sensitive. In that case it might be fine to let the `KafkaSpout` stop emitting tuples for a while if some tuples temporarily can't be acked, but having it cause an OOME would be too disruptive because the primary spout could be doing fine. I can't come up with a concrete example of this kind of configuration though. I'd be fine with keeping `maxUncommittedOffsets` for that kind of situation.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---