Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Very sorry about this, but this is still broken :( 
    
    In the current implementation (pre this PR), the spout can be prevented 
from polling if there are more than maxUncommittedOffsets failed tuples. This 
is because the check for whether to poll isn't accounting for retriable tuples 
already being counted in the numUncommittedOffsets count.
    
    The current fix is to allow failed tuples to retry always, even if 
maxUncommittedOffsets is exceeded, because failed tuples don't contribute 
further to the numUncommittedOffsets count. The problem has been to ensure that 
we don't also emit a bunch of new tuples when polling for retriable tuples, and 
end up ignoring maxUncommittedOffsets entirely. This was partially fixed by 
pausing partitions that have no retriable tuples.
    
    With the previous seeking behavior, this was a complete fix, since 
maxPollRecords put a bound on how far the spout could read from the commit 
offset in a poll that was ignoring maxUncommittedOffsets. With the new seeking 
behavior this is broken. If the spout has emitted as many tuples as allowed, 
and the last (highest offset) tuple fails, the spout may now poll for a full 
batch of new tuples, starting from the failed tuple. This scenario can repeat 
arbitrarily many times, so maxUncommittedOffsets is completely ineffective. 
    
    We don't want to go back to the old seeking behavior (IMO), because it 
meant that in the case where maxPollRecords is much lower than 
maxUncommittedOffsets (almost always), the spout might end up choking on failed 
tuples. For example, if maxPollRecords is 5, and tuple 0-4 are not ready for 
retry (they might have been retried already, and are now waiting for retry 
backoff), but tuple 5-9 are, the spout is unable to retry 5-9 (or anything else 
on that partition) because it keeps seeking back to 0, and polling out the 
first 5 tuples. Seeking directly to the retriable tuples should in most cases 
be more efficient as well, because in the old implementation we'd just be 
seeking to the last committed offset, polling, and discarding tuples until we 
reach the ones that can be retried.
    
    We could probably fix the broken behavior by trying really hard not to emit 
new tuples when we're ignoring maxUncommittedOffsets, but that seems like it 
would be error prone and complicated to implement.
    
    I think we might be able to fix this by ensuring that we don't 
"doublecount" retriable tuples. When the spout is deciding whether to poll, it 
should deduct retriable tuples from numUncommittedOffsets when comparing to 
maxUncommittedOffsets.
    
    Changing the poll check in this way is the same as enforcing the following 
constraint per partition, it seems to me:
    * Poll only if `numNonRetriableEmittedTuples < maxUncommittedOffsets`. If 
there are more nonretriable tuples than that, the poll won't be allowed because 
`numUncommittedOffsets = numRetriableTuples + numNonRetriableEmittedTuples`, so 
`numUncommittedOffsets - numRetriableTuples >= maxUncommittedOffsets`. 
    
    This should mean that the limit on uncommitted tuples on each partition is 
going to be `maxUncommittedOffsets + maxPollRecords - 1`, because the latest 
tuple that can be retried on a partition is the one at offset 
`maxUncommittedOffsets`, where there are `maxUncommittedOffsets - 1` 
uncommitted tuples "to the left". If the retry poll starts at that offset, it 
at most emits the retried tuple plus `maxPollRecords - 1` new tuples.
    
    There shouldn't be any problems when multiple partitions have retriable 
tuples, where retriable tuples on one partition might be able to cause a 
different partition to break the uncommitted offset limit. This is because a 
partition will at minimum contribute 0 to numUncommittedOffsets (e.g. if all 
uncommitted tuples on that partition are retriable), because any retriable 
tuples being subtracted were already counted in numUncommittedOffsets when the 
tuples were originally emitted.
    
    If we can enforce the limit on a per partition basis this way, there's no 
reason to worry about only emitting retriable tuples when we're exceeding 
maxUncommittedOffsets. 
    
    I don't think there's a need for pausing partitions anymore either. It was 
meant to prevent polling for new tuples when there were retriable tuples, but 
we're no longer trying to prevent that, since the per partition cap is already 
ensuring we won't emit too many tuples. Pausing in this case would prioritize 
retriable tuples over new tuples (e.g. in the case where an unpaused consumer 
might choose to fetch from a nonretriable partition even though there are 
retriable tuples), but might lead to lower throughput overall (in the case 
where there are not enough messages on the retriable partitions to fill a 
batch). I've removed it again.
    
    I've put up what I hope is the fix both here and on the 1.x branch. Sorry 
again that this has turned into such a moving target.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to