SHRAVANKUMAR DUBBUDU created STORM-2241:
-------------------------------------------

             Summary: KafkaSpout implementaion
                 Key: STORM-2241
                 URL: https://issues.apache.org/jira/browse/STORM-2241
             Project: Apache Storm
          Issue Type: Question
          Components: storm-kafka
    Affects Versions: 0.10.0
            Reporter: SHRAVANKUMAR DUBBUDU
            Priority: Minor


Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on 
the same thread. This means that an implementor of an ISpout does not need to 
worry about concurrency issues between those methods. However, it also means 
that an implementor must ensure that nextTuple is non-blocking: otherwise the 
method could block acks and fails that are pending to be processed.'

Where as KafkaSpout has below nextTuple() implementation
@Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {
                // in case the number of managers decreased
                _currPartitionIndex = _currPartitionIndex % managers.size();
                EmitState state = 
managers.get(_currPartitionIndex).next(_collector);
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    _currPartitionIndex = (_currPartitionIndex + 1) % 
managers.size();
                }
                if (state != EmitState.NO_EMITTED) {
                    break;
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                _coordinator.refresh();
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

We are seeing events are getting replayed when there is slower bolt in the 
topology chain causing duplicate messages.

Is there any way this can be fixed.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to