Hi Stig, Max uncommitted offsets is set at 10_000 and retry threshold for exception handling is 10. Thanks Pradeep
On Sat, Sep 2, 2017 at 4:46 AM, Stig Rohde Døssing <s...@apache.org> wrote: > Thanks. The bolts look fine to me. I'd look at whether the tuples are > being acked on the spout (use the debug setting on Config), and the > OffsetManager class logs I linked earlier. I don't know if it's relevant to > your case, but please note that there are some cases where setting a low > maxUncommittedOffsets can cause the spout to stop polling for tuples. It's > being fixed, but please leave maxUncommittedOffsets at the default if > you're setting it to a custom value. > > What is your retry service configuration? > > 2017-09-02 0:11 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>: > >> Yes Stig. Code posted is for DataBaseInsertBolt. Emit from last bolt is >> not needed. >> >> Problem 2 was for a separate topic . Problem 1 was observed for topics >> where processing failures are encountered previously. >> >> I have attached the error processing and bolt files >> >> Thanks >> Pradeep >> >> >> >> >> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <s...@apache.org> >> wrote: >> >>> Just to make sure I understand: >>> >>> This is your topology >>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt >>> >>> The bolt you posted the execute method for is the DataBaseInsertBolt, >>> right? >>> What are these statements for if this is the last bolt in the topology? " >>> super.getOutputCollector().emit(tuple, new Values(fullMessage));" >>> Are the topics you mention in problem 1 and 2 the same topic? >>> Essentially what I'm asking is whether the topic that is stuck is also the >>> one with failures that is starting over on an old offset? >>> Can you post your RetryService configuration? >>> You talked about moving tuples to an error queue if they fail >>> deserialization in the Avro bolt. Can you post that execute too? >>> >>> 2017-09-01 20:14 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>: >>> >>>> Thanks Stig for the response . I can give some more detail on the >>>> issue we are facing now . >>>> For any database failure ,we are retrying the tuple for upto 10 times . >>>> Database failure is mostly because of parent child relation ,since we are >>>> processing out of order . >>>> Our consumer group has more than 10 topics and each topic corresponds >>>> to one table . For eg: we have topics A, B and C in a group its >>>> corresponding to tables A,B and C in database . >>>> In this , table A will the parent and table B and table C will be child >>>> tables . >>>> Spout parallelism is set as 50 and each topic has 50 partitions .These >>>> 50 threads are going round robing across all the topics in the group. >>>> >>>> Issues observed with the current setup are >>>> >>>> 1)One partition for one topic alone getting stuck .All the other >>>> partition lag is cleared >>>> >>>> 2)Whatever topic had failures earlier ,is going to a old offset . >>>> >>>> >>>> DB Bolt Execute Method below >>>> ======================= >>>> exceptionCount will have a value greater than 0 once the message is >>>> moved to error queue . In that case i am acknowleding the message . Other >>>> cases i am calling tuple.fail. >>>> There is no downstream bolt for this . This is the final bolt in the >>>> topology. >>>> >>>> @Override >>>> >>>> public void execute(final Tuple tuple) { >>>> >>>> String fullMessage = (String) tuple.getValueByField(EXTRACTE >>>> D_MESSAGE); >>>> >>>> GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField( >>>> GG_MESSAGE); >>>> >>>> try { >>>> >>>> // Call to handler for generating Sql >>>> >>>> Date date = new Date(); >>>> >>>> super.getMessageHandler().handleMessage(ggMessage, super >>>> .getGenericMessageDAO()); >>>> >>>> super.getOutputCollector().emit(tuple, new Values( >>>> fullMessage)); >>>> >>>> super.getOutputCollector().ack(tuple); >>>> >>>> LOGGER.info("DbActionBolt Ack time in ms: {}", new >>>> Date().getTime() - date.getTime()); >>>> >>>> } catch (Exception e) { >>>> >>>> LOGGER.error("DB bolt exception occurred from Aurora : ", e >>>> ); >>>> >>>> int exceptionCount = handleException(fullMessage, ggMessage, >>>> e, isNormalProcessing); >>>> >>>> if (exceptionCount != -1) { >>>> >>>> // If message write is success acknowledge the message >>>> so >>>> >>>> // that it will be removed from kafka queue >>>> >>>> super.getOutputCollector().emit(tuple, new Values( >>>> fullMessage)); >>>> >>>> super.getOutputCollector().ack(tuple); >>>> >>>> } else { >>>> >>>> super.getOutputCollector().reportError(e); >>>> >>>> super.getOutputCollector().fail(tuple); >>>> >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <s...@apache.org> >>>> wrote: >>>> >>>>> Hi Pradeep, >>>>> >>>>> When you move the message to an error queue, is this happening from >>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the >>>>> tuple is being acked in the Avro bolt exactly once (double acking will >>>>> cause the tuple to fail)? >>>>> >>>>> Storm will ack messages on the spout as long as all edges in the tuple >>>>> tree are acked, and the topology message timeout hasn't expired before >>>>> this >>>>> occurs. >>>>> >>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt >>>>> is the only bolt consuming from the spout, the bolt will receive t0 and >>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples >>>>> anchored to t0 (using any of the https://storm.apache.org/relea >>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods >>>>> that take a Tuple anchor), the downstream bolts must ack those exactly >>>>> once >>>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root >>>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each, >>>>> and they all get acked before the message timeout elapses. >>>>> >>>>> Depending on your throughput this may be infeasible, but you might try >>>>> enabling debug logging https://storm.apache.org/relea >>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean- >>>>> which will let you tell whether the tuple is being acked on the spout. >>>>> >>>>> If the tuple is being acked on the spout, you might want to look at >>>>> some of the logs from this method https://github.com/apache/stor >>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/ >>>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should >>>>> show you what the spout is doing internally. Keep in mind that the spout >>>>> can only commit e.g. offset 10 if offsets 0-9 have all been >>>>> acked/committed, so if an earlier tuple failed and is waiting for retry >>>>> when you restart, that could also cause this. >>>>> >>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s < >>>>> sreekumar.prad...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka >>>>>> server is 0.10.1.1. >>>>>> >>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST. >>>>>> >>>>>> Message flow is like below and its a normal topology >>>>>> >>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt. >>>>>> >>>>>> If the message fails avro deserialization , i am moving the message >>>>>> to a error queue and acknowledging from the avro bolt . This message is >>>>>> not >>>>>> emitted to database bolt . >>>>>> >>>>>> But its observed that after i restart topology , offset for the topic >>>>>> is going back to old offset. >>>>>> >>>>>> Will Kafka commit the offset, only if the message is acked from all >>>>>> bolts ? >>>>>> >>>>>> Is the offset going back to previous value is beacuse of this .. >>>>>> >>>>>> Thanks >>>>>> Pradeep >>>>>> >>>>> >>>>> >>>> >>> >> >