[ https://issues.apache.org/jira/browse/STORM-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stig Rohde Døssing updated STORM-2549: -------------------------------------- Fix Version/s: 2.0.0 > The fix for STORM-2343 is incomplete, and the spout can still get stuck on > failed tuples > ---------------------------------------------------------------------------------------- > > Key: STORM-2549 > URL: https://issues.apache.org/jira/browse/STORM-2549 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 2.0.0, 1.1.0 > Reporter: Stig Rohde Døssing > Assignee: Stig Rohde Døssing > Labels: pull-request-available > Fix For: 2.0.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > Example: > Say maxUncommittedOffsets is 10, maxPollRecords is 5, and the committedOffset > is 0. > The spout will initially emit up to offset 10, because it is allowed to poll > until numNonRetriableTuples is >= maxUncommittedOffsets > The spout will be allowed to emit another 5 tuples if offset 10 fails, so if > that happens, offsets 10-14 will get emitted. If offset 1 fails and 2-14 get > acked, the spout gets stuck because it will count the "extra tuples" 11-14 in > numNonRetriableTuples. > An similar case is the one where maxPollRecords doesn't divide > maxUncommittedOffsets evenly. If it were 3 in the example above, the spout > might just immediately emit offsets 1-12. If 2-12 get acked, offset 1 cannot > be reemitted. > The proposed solution is the following: > * Enforce maxUncommittedOffsets on a per partition basis (i.e. actual limit > will be multiplied by the number of partitions) by always allowing poll for > retriable tuples that are within maxUncommittedOffsets tuples of the > committed offset. Pause any non-retriable partitions if the partition has > passed the maxUncommittedOffsets limit, and some other partition is polling > for retries while also at the maxUncommittedOffsets limit. > Example of this functionality: > MaxUncommittedOffsets is 100 > MaxPollRecords is 10 > Committed offset for partition 0 and 1 is 0. > Partition 0 has emitted 0 > Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away) > Partition 1, message 99 is retriable > We check that message 99 is within 100 emitted tuples of offset 0 (it is the > 97th tuple after offset 0, so it is) > We do not pause partition 0 because that partition isn't at the > maxUncommittedOffsets limit. > Seek to offset 99 on partition 1 and poll > We get back offset 99, 101, 103 and potentially 7 new tuples. Say the lowest > of these is at offset 104. > The spout emits offset 99, filters out 101 and 103 because they were already > emitted, and emits the 7 new tuples. > If offset 104 (or later) become retriable, they are not retried until the > committed offset moves. This is because offset 104 is the 101st tuple emitted > after offset 0, so it isn't allowed to retry until the committed offset moves. -- This message was sent by Atlassian JIRA (v6.4.14#64029)