Github user reiabreu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2593#discussion_r178559146
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -570,20 +572,25 @@ public void ack(Object messageId) {
             // Only need to keep track of acked tuples if commits to Kafka are 
controlled by
             // tuple acks, which happens only for at-least-once processing 
semantics
             final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
    -        if (!emitted.contains(msgId)) {
    -            if (msgId.isEmitted()) {
    +        if (!msgId.isNullTuple()) {
    --- End diff --
    
    Sounds good, will update it.
    Regarding tests, we need to add at least one: 
    
    Test all messages are commited for all null tuples when Spout is not set to 
emit null tuples
    
    ```
    @Test
        public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() 
throws Exception {
            final int messageCount = 10;
            prepareSpout(messageCount);
    
            //All null tuples should be commited, meaning they were considered 
by to be emitted and acked
            for(int i = 0; i < messageCount; i++) {
                spout.nextTuple();
            }
    
            verify(collectorMock,never()).emit(
                    anyString(),
                    anyList(),
                    any());
    
            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
            //Commit offsets
            spout.nextTuple();
    
            verifyAllMessagesCommitted(messageCount);
        }
    ```
    We would need a  new SingleTopicKafkaSpoutConfiguration with something like:
    
    ```
     private static class NullRecordExtractor implements RecordTranslator{
            
            @Override
            public List<Object> apply(ConsumerRecord record) {
                return null;
    
            }
    
            @Override
            public Fields getFieldsFor(String stream) {
                return new Fields("topic", "key", "value");
            }
    
            @Override
            public Object apply(Object record) {
                return null;
            }
        }
    ```
    
    I was planning to extend KafkaSpoutAbstractTest on something similar to 
KafkaSpoutSingleTopicTest.



---

Reply via email to