Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2380#discussion_r147050462
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -336,22 +335,25 @@ private void emit() {
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(),
record.partition());
final KafkaSpoutMessageId msgId = retryService.getMessageId(tp,
record.offset());
+
if (offsetManagers.containsKey(tp) &&
offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked.
Skipping", record);
- } else if (emitted.contains(msgId)) { // has been emitted and
it's pending ack or fail
+ } else if (emitted.contains(msgId)) { // has been emitted and it
is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else {
- Validate.isTrue(kafkaConsumer.committed(tp) == null ||
kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp),
- "The spout is about to emit a message that has already
been committed."
- + " This should never occur, and indicates a bug in the
spout");
+ if (kafkaConsumer.committed(tp) != null &&
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
--- End diff --
Makes sense, thanks.
---