Github user reiabreu commented on a diff in the pull request: https://github.com/apache/storm/pull/2593#discussion_r178559146 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -570,20 +572,25 @@ public void ack(Object messageId) { // Only need to keep track of acked tuples if commits to Kafka are controlled by // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - if (!emitted.contains(msgId)) { - if (msgId.isEmitted()) { + if (!msgId.isNullTuple()) { --- End diff -- Sounds good, will update it. Regarding tests, we need to add at least one: Test all messages are commited for all null tuples when Spout is not set to emit null tuples ``` @Test public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() throws Exception { final int messageCount = 10; prepareSpout(messageCount); //All null tuples should be commited, meaning they were considered by to be emitted and acked for(int i = 0; i < messageCount; i++) { spout.nextTuple(); } verify(collectorMock,never()).emit( anyString(), anyList(), any()); Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); //Commit offsets spout.nextTuple(); verifyAllMessagesCommitted(messageCount); } ``` We would need a new SingleTopicKafkaSpoutConfiguration with something like: ``` private static class NullRecordExtractor implements RecordTranslator{ @Override public List<Object> apply(ConsumerRecord record) { return null; } @Override public Fields getFieldsFor(String stream) { return new Fields("topic", "key", "value"); } @Override public Object apply(Object record) { return null; } } ``` I was planning to extend KafkaSpoutAbstractTest on something similar to KafkaSpoutSingleTopicTest.
---