Github user revans2 commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-187425242
Looking through the code I think we are keeping track of way too much
state. In some places we cannot help it, but in most places we can probably
rely on storm to keep track of the state for us.
First of all we probably don't need failed, emittedTuples, nor blacklist.
Just put the information we need in the message id and let storm keep track of
it for you. The Message ID that you pass in never leaves the spout and is
never read by storm, except to check if it is null or not, so it become very
convenient in implementing a best effort reply cache.
```
public class MessageId {
private final TopicPartition topicPart;
private final long offset;
private final long epoc;
private final List<Object> values;
private int numFails;
...
}
```
To implement blacklisting instead of clearing state, and shifting things
around, just increment an epoc counter for the entire spout. Then in the logic
for fail (where we do the replay) we would have something like.
```
public void fail(Object mid) {
MessageId id = (MesageId)mid;
id.incNumFails();
if (id.getEpoc() == currentEpic.get()) {
if (id.getNumFails > CUT_OFF) {
markAsDone(id);
} else {
collector.emit(id.getValues(), id);
}
}
}
```
And ack would be similar
```
public void ack(Object mid) {
MessageId id = (MesageId)mid;
if (id.getEpoc() == currentEpic.get()) {
markAsDone(id);
}
}
```
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---