Github user jianbzhou commented on the pull request:
https://github.com/apache/storm/pull/1131#issuecomment-217776015
Hi Hmcl,
During our testing we found sometime the poll method was not called for
long time, I suspect it is caused by below condition:
private boolean poll() {
return !waitingToEmit() && **numUncommittedOffsets <
kafkaSpoutConfig.getMaxUncommittedOffsets()**;
}
I found numUncommittedOffsets will be incremented in either of the below
situation:
1. (!retryService.isScheduled(msgId) â this is the most common situation
â one message was polled and it is not in the toRetryMsg â it is a normal
emit instead of a retry;
2. retryService.isReady(msgId) â this means the message was emitted
previously â now will be re-emitted as per the retry logic.
As per below logic â for one message 50, in the first time it was polled
and emiited, numUncommittedOffsets will be incremented by 1, then this message
failed and retried for 10 times, so totally numUncommittedOffsets will be
increamented by 11.
private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦
else if (!retryService.isScheduled(msgId) ||
retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never
emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
**numUncommittedOffsets++;**
â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦
}
But as per below logic â after successful commit, numUncommittedOffsets
will subtract the actual number of message that got commited. If it commit one
message 50, then will only substract 1 instead of 11.
public void commit(OffsetAndMetadata committedOffset) {
â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦
**numUncommittedOffsets-= numCommittedOffsets;**
â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦â¦
}
Under some circumstances â say a rebalance happened and we seek back to a
very small/early offset, seems this would cause emitTupleIfNotEmitted have a
quite big number â finally this will be greater than
kafkaSpoutConfig.getMaxUncommittedOffsets, and got poll() method not be called.
I am not sure if I corrrectly understand your code or miss anything â
could you please kindly help confirm if above situtaion is possible or not?
Please feel free to let me know if you need any further info and thanks for
your help.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---