Github user hmcl commented on the issue:
https://github.com/apache/storm/pull/1924
@srdo I am reading through your detailed explanation again and will let you
know my opinion concerning the need, or no need, for the parameter
`maxUncommittedOffsets`, based on all the facts you state.
Prior to that, however, I will share right away what is the rationale
behind `maxUncommittedOffsets` and why I think we still need it.
`maxUncommittedOffsets` is unrelated to
`ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and to
`Config#TOPOLOGY_MAX_SPOUT_PENDING`. It's purpose is to allow the `KafkaSpout`
to keep polling/fetching records from Kafka while imposing a (memory
consumption) limit on how many offsets can be **polled and non committed over
ANY ARBITRARY NUMBER of polls to Kafka (not just one poll)**, before the
KafkaSpout stops polling.
This limit is necessary because if e.g. tuple for offset **3** keeps
failing, and all subsequent offsets **(4, 5, 6, ... 1M, ... 1B, ...)** are
acked, the spout won't be able to commit to Kafka any offset greater than 3,
unless it reaches **maxNumberOfRetrials**. If there is no limit on
**maxNumberOfRetrials** (in order to guarantee at least once delivery), and
offset 3 keeps failing forever, the spout won't ever be able to commit any
offset > 3. In the limit this would cause the `Map<TopicPartition,
OffsetManager> acked` to grow to infinite size (despite it's small footprint,
which is constituted of mainly offsets and some bookkeeping small objects - it
doesn't have `ConsumerRecord`s).
In order to put a cap on the size of this map, `maxUncommittedOffset` was
created to stop polling if this number is too high, and avoid
`OutOfMemoryError`. **The purpose of the KafkaSpout keeping on polling despite
a particular offset (tuple) keeping on failing is to increase the overall
throughput.** If at some point the offset 3 is successfully acked, the max
subset of continuous offsets acked would be committed to Kafka in the next
`Timer commitTimer` cycle, and the size of `Map<TopicPartition, OffsetManager>
acked` would be reduced (potentially by a lot). The spout could then keep doing
its work and the overall throughput would be much higher.
`maxUncommittedOffsets` is not a hard limit parameter, and I don't think
that we have to enforce that the KafkaSpout never exceeds this number. It is OK
to exceed it as long as the JVM doesn't throw `OutOfMemoryError`. Currently,
and prior to these proposed changes, in the worse case
**maxUncommittedOffsets** is upper bounded by **maxUncommittedOffsets +
maxFetchRecords - 1**, which is OK as long as there is memory.
Perhaps the name `maxUncommittedOffsets` is not the most clear name. We can
either change the name (which can cause background compatibility issues and/or
confusion), or document properly what this parameter really means. The bottom
line is that I think that we need a parameter doing the job that
`maxUncommittedOffsets` is currently doing, which I don't think can be
accomplished with a combination of`ConsumerConfig#MAX_POLL_RECORDS_CONFIG`
and/or `Config#TOPOLOGY_MAX_SPOUT_PENDING`
The name `maxUncommittedOffsets` should more precisely be something like
`allowPollingIflLessThanMaxUncommittedOffsets`
Please let me know your thoughts. I will still go over your explanation
again in the meantime.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---