Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157346484
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -399,14 +429,16 @@ private void emitIfWaitingNotEmitted() {
          */
         private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
             final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
    -        final KafkaSpoutMessageId msgId = 
retryService.getMessageId(record);
    +        final KafkaSpoutMessageId msgId = 
retryService.getMessageId(record, context);
     
             if (offsetManagers.containsKey(tp) && 
offsetManagers.get(tp).contains(msgId)) {   // has been acked
                 LOG.trace("Tuple for record [{}] has already been acked. 
Skipping", record);
             } else if (emitted.contains(msgId)) {   // has been emitted and it 
is pending ack or fail
                 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
             } else {
    -            if (kafkaConsumer.committed(tp) != null && 
(kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) {
    +            final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    +            if (isOffsetCommittedByThisTopology(committedOffset) && 
committedOffset.offset() > kafkaConsumer.position(tp)) {
    +                // this check should pass if commit was done by another 
topology such that FirstPollOffsetStrategy == EARLIEST is honored (STORM-2844)
    --- End diff --
    
    Nit: Describe when this check is valid instead. e.g. "It should never be 
possible for the consumer position to fall behind the offset committed by this 
topology"


---

Reply via email to