Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2380#discussion_r147014032 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -336,22 +335,25 @@ private void emit() { private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); + if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); - } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail + } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), - "The spout is about to emit a message that has already been committed." - + " This should never occur, and indicates a bug in the spout"); + if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { --- End diff -- Validate throws an IllegalArgumentException where the correct exception here is IllegalStateException. Furthermore, Validate in my opinion as a confusing API - Validate(true) throws an exception if false. It is misleading to me. I would rather leave it like this.
---