[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400983#comment-16400983 ]
Stig Rohde Døssing commented on STORM-2994: ------------------------------------------- Yes, I think that's the bug. Seems like there was a mistake when adding the null filtering code. See https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602. Just going to give a quick summary of how we got to this code, and what I think is wrong. Originally the offset manager simply kept track of acked tuples, and committed tuples once there was a complete sequence of acked tuples to commit (e.g. if 1,2,3,5,6 were acked, the offset manager would commit 1,2,3 and wait for 4 to be acked). A later PR added support for topic compaction, i.e. topics where an offset sequence might look like 1,2,3,5,6, with offset 4 missing entirely. The offset manager now allows skipping offset 4 if it is known that it was never emitted by the spout. When null tuple filtering was added, I think this functionality wasn't kept in mind. When a null tuple is filtered out, it's just discarded without being added to the offset manager emitted list, or the acked list. The result seems to be that the offset manager is never told about the null tuples, so won't commit them either. When the next non-null tuple gets acked, the offset manager is told about it. From the offset manager's perspective, it now looks like there was a gap of offsets that weren't emitted by the spout, so it reacts by committing past them. Unfortunately this only happens once a non-null tuple is acked, which can take arbitrarily long. I think the fix should be fairly simple: We add the null filtered tuples to the offset manager acked and emitted lists, so null filtering will behave as if the null tuples were acked immediately. If you'd like to give fixing it a shot, the two places to look would be https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L503 and https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L602. If you don't, let me know and I'll take a look when I get a chance. > KafkaSpout consumes messages but doesn't commit offsets > ------------------------------------------------------- > > Key: STORM-2994 > URL: https://issues.apache.org/jira/browse/STORM-2994 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.1.0, 1.1.2 > Reporter: RAbreu > Priority: Major > > A topology that consumes from two different Kafka clusters: 0.10.1.1 and > 0.10.2.1. > Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) > The Spout that consumes from 0.10.1.1 exhibits either: > 1- Unknown lag > 2- Lag that increments as the Spout reads messages from Kafka > > In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be > committed", despite continuing to consume messages. > Several configuration tweaks were tried, including setting maxRetries to 1, > in case messages with a lower offset were being retried (logs didn't show it, > though) > offsetCommitPeriodMs was also lowered to no avail. > The only configuration that works is to have > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired since > we lose processing guarantees. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)