Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147013717 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -255,26 +255,25 @@ private void throwKafkaConsumerInterruptedException() { } private boolean commit() { - return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode } private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); final int readyMessageCount = retryService.readyMessageCount(); final boolean poll = !waitingToEmit() - //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit - //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, - //and prevents locking up the spout when there are too many retriable tuples - && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets - || !isAtLeastOnce()); + // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit. --- End diff -- Done.
---