SHRAVANKUMAR DUBBUDU created STORM-2241:
-------------------------------------------
Summary: KafkaSpout implementaion
Key: STORM-2241
URL: https://issues.apache.org/jira/browse/STORM-2241
Project: Apache Storm
Issue Type: Question
Components: storm-kafka
Affects Versions: 0.10.0
Reporter: SHRAVANKUMAR DUBBUDU
Priority: Minor
Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on
the same thread. This means that an implementor of an ISpout does not need to
worry about concurrency issues between those methods. However, it also means
that an implementor must ensure that nextTuple is non-blocking: otherwise the
method could block acks and fails that are pending to be processed.'
Where as KafkaSpout has below nextTuple() implementation
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state =
managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) %
managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
We are seeing events are getting replayed when there is slower bolt in the
topology chain causing duplicate messages.
Is there any way this can be fixed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)