Which version of Storm are you using ? On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko <f1she...@gmail.com<mailto:f1she...@gmail.com>> wrote:
Thanks for reply Hugo. I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff calls. I just noticed, that in log I have there's strange thing. First message is "Unexpected offset found [2777849]". It's strange because if you look on partition 10 commited offset, it is 2777978 which is a little bit higher then offset found. The next message in log was "No offsets ready to commit." So, after checking 2777849 offset it immediately stoped seeking new offset to commit. On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro <hlo...@hortonworks.com<mailto:hlo...@hortonworks.com>> wrote: Hi, Most likely this is happening because some messages failed and/or got acked out of order. For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is message (with offset 4) that failed, the Spout will only commit offset 3. Until the message with offset 4 is acked, or reaches max number of retrials (which is configurable but by default is forever), the messages with offsets 5,6,7,… will not get committed despite having been acked. That is because you cannot do kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message with offset 4 has not been acked or discarded by reaching the max number of retrials. Until the spout moves on from message with offset 4, the lag will increase when new messages come in. You can try enabling the log level to ALL for org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff to see which messages are getting retried. You can also set log level to DEBUG or ALL org.apache.storm.kafka.spout.KafkaSpout to see exactly which offsets/records are being processed. However, it will print a lot of messages, and may slow down processing considerably. You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if that solves this situation. Hugo > On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko > <f1she...@gmail.com<mailto:f1she...@gmail.com>> wrote: > > Today in Storm UI I saw this Kafka Spouts Lag: > Id Topic Partition Latest Offset Spout Committed > Offset Lag > Kafka Spout test_topic 0 5591087 5562814 > 28273 > Kafka Spout test_topic 1 2803256 2789090 > 14166 > Kafka Spout test_topic 2 2801927 2787767 > 14160 > Kafka Spout test_topic 3 2800627 2800626 > 1 > Kafka Spout test_topic 4 2799391 2785238 > 14153 > Kafka Spout test_topic 5 2798126 2798125 > 1 > Kafka Spout test_topic 6 2796874 2782726 > 14148 > Kafka Spout test_topic 7 2795669 2781528 > 14141 > Kafka Spout test_topic 8 2794419 2780280 > 14139 > Kafka Spout test_topic 9 2793255 2793254 > 1 > Kafka Spout test_topic 10 2792109 2777978 > 14131 > Kafka Spout test_topic 11 2790939 2776817 > 14122 > Kafka Spout test_topic 12 2789783 2775665 > 14118 > Kafka Spout test_topic 13 2788651 2774539 > 14112 > Kafka Spout test_topic 14 2787521 2773412 > 14109 > > > There was no new messages in that topic for a while, so I expected, that my > topology would process all messages. But lag shows me that there's some > uncommitted messages in most of topics. Topology stop working and didn't > process any messages for few hours. > > In logs I found these messages: > 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found > [2777849]. OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, > committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, > offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, > numFails=0}, > ........................................ > {topic-partition=test_topic-10, offset=2792107, numFails=0}, > {topic-partition=test_topic-10, offset=2792108, numFails=0}]} > 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready to > commit. OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, > committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, > offset=2777849, numFails=0}, > ....................................... > {topic-partition=test_topic-10, offset=2792108, numFails=0}]} > > > So, I assume, messages, that showed as uncommitted, are actually processed by > topology and acked. After I start sending new messages to Kafka topic > topology start working, but spout lag increasing. > Why spout could stop committing to Kafka?