Github user srdo commented on the issue:
https://github.com/apache/storm/pull/1924
Very sorry about this, but this is still broken :(
In the current implementation (pre this PR), the spout can be prevented
from polling if there are more than maxUncommittedOffsets failed tuples. This
is because the check for whether to poll isn't accounting for retriable tuples
already being counted in the numUncommittedOffsets count.
The current fix is to allow failed tuples to retry always, even if
maxUncommittedOffsets is exceeded, because failed tuples don't contribute
further to the numUncommittedOffsets count. The problem has been to ensure that
we don't also emit a bunch of new tuples when polling for retriable tuples, and
end up ignoring maxUncommittedOffsets entirely. This was partially fixed by
pausing partitions that have no retriable tuples.
With the previous seeking behavior, this was a complete fix, since
maxPollRecords put a bound on how far the spout could read from the commit
offset in a poll that was ignoring maxUncommittedOffsets. With the new seeking
behavior this is broken. If the spout has emitted as many tuples as allowed,
and the last (highest offset) tuple fails, the spout may now poll for a full
batch of new tuples, starting from the failed tuple. This scenario can repeat
arbitrarily many times, so maxUncommittedOffsets is completely ineffective.
We don't want to go back to the old seeking behavior (IMO), because it
meant that in the case where maxPollRecords is much lower than
maxUncommittedOffsets (almost always), the spout might end up choking on failed
tuples. For example, if maxPollRecords is 5, and tuple 0-4 are not ready for
retry (they might have been retried already, and are now waiting for retry
backoff), but tuple 5-9 are, the spout is unable to retry 5-9 (or anything else
on that partition) because it keeps seeking back to 0, and polling out the
first 5 tuples. Seeking directly to the retriable tuples should in most cases
be more efficient as well, because in the old implementation we'd just be
seeking to the last committed offset, polling, and discarding tuples until we
reach the ones that can be retried.
We could probably fix the broken behavior by trying really hard not to emit
new tuples when we're ignoring maxUncommittedOffsets, but that seems like it
would be error prone and complicated to implement.
I think we might be able to fix this by ensuring that we don't
"doublecount" retriable tuples. When the spout is deciding whether to poll, it
should deduct retriable tuples from numUncommittedOffsets when comparing to
maxUncommittedOffsets.
Changing the poll check in this way is the same as enforcing the following
constraint per partition, it seems to me:
* Poll only if `numNonRetriableEmittedTuples < maxUncommittedOffsets`. If
there are more nonretriable tuples than that, the poll won't be allowed because
`numUncommittedOffsets = numRetriableTuples + numNonRetriableEmittedTuples`, so
`numUncommittedOffsets - numRetriableTuples >= maxUncommittedOffsets`.
This should mean that the limit on uncommitted tuples on each partition is
going to be `maxUncommittedOffsets + maxPollRecords - 1`, because the latest
tuple that can be retried on a partition is the one at offset
`maxUncommittedOffsets`, where there are `maxUncommittedOffsets - 1`
uncommitted tuples "to the left". If the retry poll starts at that offset, it
at most emits the retried tuple plus `maxPollRecords - 1` new tuples.
There shouldn't be any problems when multiple partitions have retriable
tuples, where retriable tuples on one partition might be able to cause a
different partition to break the uncommitted offset limit. This is because a
partition will at minimum contribute 0 to numUncommittedOffsets (e.g. if all
uncommitted tuples on that partition are retriable), because any retriable
tuples being subtracted were already counted in numUncommittedOffsets when the
tuples were originally emitted.
If we can enforce the limit on a per partition basis this way, there's no
reason to worry about only emitting retriable tuples when we're exceeding
maxUncommittedOffsets.
I don't think there's a need for pausing partitions anymore either. It was
meant to prevent polling for new tuples when there were retriable tuples, but
we're no longer trying to prevent that, since the per partition cap is already
ensuring we won't emit too many tuples. Pausing in this case would prioritize
retriable tuples over new tuples (e.g. in the case where an unpaused consumer
might choose to fetch from a nonretriable partition even though there are
retriable tuples), but might lead to lower throughput overall (in the case
where there are not enough messages on the retriable partitions to fill a
batch). I've removed it again.
I've put up what I hope is the fix both here and on the 1.x branch. Sorry
again that this has turned into such a moving target.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---