Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2380#discussion_r147013717
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException()
{
}
private boolean commit() {
- return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();
// timer != null for non auto commit mode
+ return isAtLeastOnceProcessing() &&
commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
}
private boolean poll() {
final int maxUncommittedOffsets =
kafkaSpoutConfig.getMaxUncommittedOffsets();
final int readyMessageCount = retryService.readyMessageCount();
final boolean poll = !waitingToEmit()
- //Check that the number of uncommitted, nonretriable tuples is
less than the maxUncommittedOffsets limit
- //Accounting for retriable tuples this way still guarantees
that the limit is followed on a per partition basis,
- //and prevents locking up the spout when there are too many
retriable tuples
- && (numUncommittedOffsets - readyMessageCount <
maxUncommittedOffsets
- || !isAtLeastOnce());
+ // Check that the number of uncommitted, non-retriable
tuples is less than the maxUncommittedOffsets limit.
--- End diff --
Done.
---