Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/2380#discussion_r147015407
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -438,55 +440,53 @@ private void commitOffsetsForAckedTuples() {
// ======== Ack =======
@Override
public void ack(Object messageId) {
- if (!isAtLeastOnce()) {
- // Only need to keep track of acked tuples if commits are done
based on acks
- return;
- }
-
+ // Only need to keep track of acked tuples if commits to Kafka are
done after a tuple ack is received
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- if (!emitted.contains(msgId)) {
- if (msgId.isEmitted()) {
- LOG.debug("Received ack for message [{}], associated with
tuple emitted for a ConsumerRecord that "
- + "came from a topic-partition that this consumer
group instance is no longer tracking "
- + "due to rebalance/partition reassignment. No action
taken.", msgId);
+ if (isAtLeastOnceProcessing()) {
+ if (!emitted.contains(msgId)) {
+ if (msgId.isEmitted()) {
+ LOG.debug("Received ack for message [{}], associated
with tuple emitted for a ConsumerRecord that "
+ + "came from a topic-partition that this consumer
group instance is no longer tracking "
+ + "due to rebalance/partition reassignment. No
action taken.", msgId);
+ } else {
+ LOG.debug("Received direct ack for message [{}],
associated with null tuple", msgId);
+ }
} else {
- LOG.debug("Received direct ack for message [{}],
associated with null tuple", msgId);
+ Validate.isTrue(!retryService.isScheduled(msgId), "The
message id " + msgId + " is queued for retry while being acked."
+ + " This should never occur barring errors in the
RetryService implementation or the spout code.");
+
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
+ emitted.remove(msgId);
}
- } else {
- Validate.isTrue(!retryService.isScheduled(msgId), "The message
id " + msgId + " is queued for retry while being acked."
- + " This should never occur barring errors in the
RetryService implementation or the spout code.");
-
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
- emitted.remove(msgId);
+ tupleListener.onAck(msgId);
}
- tupleListener.onAck(msgId);
}
// ======== Fail =======
@Override
public void fail(Object messageId) {
- if (!isAtLeastOnce()) {
- // Only need to keep track of failed tuples if commits are
done based on acks
- return;
- }
+ // Only need to keep track of failed tuples if commits to Kafka
are done after a tuple ack is received
+ if (isAtLeastOnceProcessing()) {
--- End diff --
Same comment above
---