Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147016997 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() { // ======== Ack ======= @Override public void ack(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of acked tuples if commits are done based on acks - return; - } - + // Only need to keep track of acked tuples if commits to Kafka are done after a tuple ack is received final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - if (!emitted.contains(msgId)) { - if (msgId.isEmitted()) { - LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " - + "came from a topic-partition that this consumer group instance is no longer tracking " - + "due to rebalance/partition reassignment. No action taken.", msgId); + if (isAtLeastOnceProcessing()) { + if (!emitted.contains(msgId)) { + if (msgId.isEmitted()) { + LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); + } else { + LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); + } } else { - LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); + Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." + + " This should never occur barring errors in the RetryService implementation or the spout code."); + offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); + emitted.remove(msgId); } - } else { - Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." - + " This should never occur barring errors in the RetryService implementation or the spout code."); - offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); - emitted.remove(msgId); + tupleListener.onAck(msgId); } - tupleListener.onAck(msgId); } // ======== Fail ======= @Override public void fail(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of failed tuples if commits are done based on acks - return; - } + // Only need to keep track of failed tuples if commits to Kafka are done after a tuple ack is received --- End diff -- Done.
---