Github user jianbzhou commented on the pull request:

    https://github.com/apache/storm/pull/1131#issuecomment-217776015
  
    Hi Hmcl,
    
    During our testing we found sometime the poll method was not called for 
long time, I suspect it is caused by below condition:
    
    private boolean poll() {
        return !waitingToEmit() && **numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets()**;
    }
    
    I found numUncommittedOffsets will be incremented in either of the below 
situation:
    1.  (!retryService.isScheduled(msgId) – this is the most common situation 
– one message was polled and it is not in the toRetryMsg – it is a normal 
emit instead of a retry;
    2.  retryService.isReady(msgId) – this means the message was emitted 
previously – now will be re-emitted as per the retry logic.
    
    As per below logic – for one message 50, in the first time it was polled 
and emiited, numUncommittedOffsets will be incremented by 1, then this message 
failed and retried for 10 times, so totally numUncommittedOffsets will be 
increamented by 11.
    
    
    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        ………………………………
        else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
            final List<Object> tuple = tuplesBuilder.buildTuple(record);
            kafkaSpoutStreams.emit(collector, tuple, msgId);
            emitted.add(msgId);
            **numUncommittedOffsets++;**
        ………………………………
    }
    
    But as per below logic – after successful commit, numUncommittedOffsets 
will subtract the actual number of message that got commited. If it commit one 
message 50, then will only substract 1 instead of 11.
    
    public void commit(OffsetAndMetadata committedOffset) {
        ………………………………
            **numUncommittedOffsets-= numCommittedOffsets;**
        ………………………………
    }
    
    Under some circumstances – say a rebalance happened and we seek back to a 
very small/early offset, seems this would cause emitTupleIfNotEmitted have a 
quite big number – finally this will be greater than 
kafkaSpoutConfig.getMaxUncommittedOffsets, and got poll() method not be called.
    
    
    I am not sure if I corrrectly understand your code or miss anything – 
could you please kindly help confirm if above situtaion is possible or not?
    
    Please feel free to let me know if you need any further info and thanks for 
your help.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to