I'm using Hortonworks Data Platform 2.5.0.0 with included version of Storm 1.0.1.2.5.0.0-1245. I guess the closest branch in Apache Storm repo is 1.x-branch
On Fri, Feb 17, 2017 at 12:34 AM, Hugo Da Cruz Louro <hlo...@hortonworks.com > wrote: > Which version of Storm are you using ? > > On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko <f1she...@gmail.com> wrote: > > Thanks for reply Hugo. > I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff > calls. > > I just noticed, that in log I have there's strange thing. First message is > "*Unexpected offset found [2777849]*". It's strange because if you look > on partition 10 commited offset, it is 2777978 which is a little bit > higher then offset found. The next message in log was "*No offsets ready > to commit.*" > > So, after checking *2777849 * offset it immediately stoped seeking new > offset to commit. > > On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro < > hlo...@hortonworks.com> wrote: > >> Hi, >> >> Most likely this is happening because some messages failed and/or got >> acked out of order. >> >> For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X >> is message (with offset 4) that failed, the Spout will only commit offset >> 3. Until the message with offset 4 is acked, or reaches max number of >> retrials (which is configurable but by default is forever), the messages >> with offsets 5,6,7,… will not get committed despite having been acked. That >> is because you cannot do kafkaConsumer.commitSync(new >> TopicPartion(test_topic,5)) if the message with offset 4 has not been acked >> or discarded by reaching the max number of retrials. Until the spout moves >> on from message with offset 4, the lag will increase when new messages come >> in. >> >> You can try enabling the log level to ALL for >> org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff to see >> which messages are getting retried. You can also set log level to DEBUG or >> ALL org.apache.storm.kafka.spout.KafkaSpout to see exactly which >> offsets/records are being processed. However, it will print a lot of >> messages, and may slow down processing considerably. >> >> You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to >> see if that solves this situation. >> >> Hugo >> >> > On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko <f1she...@gmail.com> wrote: >> > >> > Today in Storm UI I saw this Kafka Spouts Lag: >> > Id Topic Partition Latest Offset Spout >> Committed Offset Lag >> > Kafka Spout test_topic 0 5591087 5562814 >> 28273 >> > Kafka Spout test_topic 1 2803256 2789090 >> 14166 >> > Kafka Spout test_topic 2 2801927 2787767 >> 14160 >> > Kafka Spout test_topic 3 2800627 2800626 >> 1 >> > Kafka Spout test_topic 4 2799391 2785238 >> 14153 >> > Kafka Spout test_topic 5 2798126 2798125 >> 1 >> > Kafka Spout test_topic 6 2796874 2782726 >> 14148 >> > Kafka Spout test_topic 7 2795669 2781528 >> 14141 >> > Kafka Spout test_topic 8 2794419 2780280 >> 14139 >> > Kafka Spout test_topic 9 2793255 2793254 >> 1 >> > Kafka Spout test_topic 10 2792109 2777978 >> 14131 >> > Kafka Spout test_topic 11 2790939 2776817 >> 14122 >> > Kafka Spout test_topic 12 2789783 2775665 >> 14118 >> > Kafka Spout test_topic 13 2788651 2774539 >> 14112 >> > Kafka Spout test_topic 14 2787521 2773412 >> 14109 >> > >> > >> > There was no new messages in that topic for a while, so I expected, >> that my topology would process all messages. But lag shows me that there's >> some uncommitted messages in most of topics. Topology stop working and >> didn't process any messages for few hours. >> > >> > In logs I found these messages: >> > 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset >> found [2777849]. OffsetEntry{topic-partition=test_topic-10, >> fetchOffset=2775755, committedOffset=2777978, >> ackedMsgs=[{topic-partition=test_topic-10, >> offset=2777849, numFails=0}, {topic-partition=test_topic-10, >> offset=2777850, numFails=0}, >> > ........................................ >> > {topic-partition=test_topic-10, offset=2792107, numFails=0}, >> {topic-partition=test_topic-10, offset=2792108, numFails=0}]} >> > 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready >> to commit. OffsetEntry{topic-partition=test_topic-10, >> fetchOffset=2775755, committedOffset=2777978, >> ackedMsgs=[{topic-partition=test_topic-10, >> offset=2777849, numFails=0}, >> > ....................................... >> > {topic-partition=test_topic-10, offset=2792108, numFails=0}]} >> > >> > >> > So, I assume, messages, that showed as uncommitted, are actually >> processed by topology and acked. After I start sending new messages to >> Kafka topic topology start working, but spout lag increasing. >> > Why spout could stop committing to Kafka? >> >> > >