Kafka still contains the logs and they would be there upto the configured
time of log retention period. Check server.properties of kafka and update
the log retention period to 5 min and restart kafka and when kafka
stablizes, shut down it and restart the it with original value of log
retentions period property.


On Tue, Sep 9, 2014 at 10:40 PM, Kushan Maskey <
kushan.mas...@mmillerassociates.com> wrote:

> I hope it did because I dont see the multiple tuple failure error. But I
> see another issue.
> I have stopped loading the batch process that sends messages to  Kafka. I
> killed my topology and then restarted again. I still see that message are
> been loaded into Cassandra. Does that mean that storm still trying to
> process the failed messages? Is htere a way to flush the old message out
> from storm so I can fresh start it?
>
> --
> Kushan Maskey
> 817.403.7500
>
> On Tue, Sep 9, 2014 at 10:09 AM, Naresh Kosgi <nareshko...@gmail.com>
> wrote:
>
>> Yes, that is what I was talking about.  Hopefully that fixes it.
>>
>> On Tue, Sep 9, 2014 at 10:59 AM, Kushan Maskey <
>> kushan.mas...@mmillerassociates.com> wrote:
>>
>>> Just realized that the tuple timeout you are talking about is the 
>>> "topology.message.timeout.secs"
>>> which was set to 30 sec and now I made to to 120.
>>>
>>> --
>>> Kushan Maskey
>>> 817.403.7500
>>>
>>> On Tue, Sep 9, 2014 at 9:43 AM, Kushan Maskey <
>>> kushan.mas...@mmillerassociates.com> wrote:
>>>
>>>>
>>>> Thanks and apologies, I should I mentioned that in my question earlier.
>>>> I am using storm 0.9.2 and using the inbuilt KafkaSpout. I do not implement
>>>> any failure my self. Do I need to create my own custom KafkaSpout?
>>>>
>>>> I have not set timeout for tuples. In fact I dont know where to set
>>>> that. Here is my storm config if that is where I need to set the time out.
>>>> But non of them say anything about tuple timeout.
>>>>
>>>> dev.zookeeper.path/tmp/dev-storm-zookeeperdrpc.childopts-Xmx768m
>>>> drpc.invocations.port3773drpc.port3772drpc.queue.size128
>>>> drpc.request.timeout.secs600drpc.worker.threads64java.library.path
>>>> /usr/local/lib:/opt/local/lib:/usr/liblogviewer.appender.nameA1
>>>> logviewer.childopts-Xmx128mlogviewer.port8000nimbus.childopts-Xmx1024m
>>>> nimbus.cleanup.inbox.freq.secs600nimbus.file.copy.expiration.secs600
>>>> nimbus.hostnmcxstrmd001nimbus.inbox.jar.expiration.secs3600
>>>> nimbus.monitor.freq.secs10nimbus.reassigntrue
>>>> nimbus.supervisor.timeout.secs60nimbus.task.launch.secs120
>>>> nimbus.task.timeout.secs30nimbus.thrift.max_buffer_size1048576
>>>> nimbus.thrift.port6627nimbus.topology.validator
>>>> backtype.storm.nimbus.DefaultTopologyValidatorstorm.cluster.mode
>>>> distributedstorm.local.dir/data/disk00/storm/localdir
>>>> storm.local.mode.zmqfalsestorm.messaging.netty.buffer_size5242880
>>>> storm.messaging.netty.client_worker_threads1
>>>> storm.messaging.netty.flush.check.interval.ms10
>>>> storm.messaging.netty.max_retries30storm.messaging.netty.max_wait_ms
>>>> 1000storm.messaging.netty.min_wait_ms100
>>>> storm.messaging.netty.server_worker_threads1
>>>> storm.messaging.netty.transfer.batch.size262144
>>>> storm.messaging.transportbacktype.storm.messaging.netty.Context
>>>> storm.thrift.transport
>>>> backtype.storm.security.auth.SimpleTransportPlugin
>>>> storm.zookeeper.connection.timeout15000storm.zookeeper.port2181
>>>> storm.zookeeper.retry.interval1000
>>>> storm.zookeeper.retry.intervalceiling.millis30000
>>>> storm.zookeeper.retry.times5storm.zookeeper.root/storm
>>>> storm.zookeeper.serversnmcxstrmd001storm.zookeeper.session.timeout20000
>>>> supervisor.childopts-Xmx256msupervisor.enabletrue
>>>> supervisor.heartbeat.frequency.secs5supervisor.monitor.frequency.secs3
>>>> supervisor.slots.ports
>>>> 6700,6701,6702,6703,6704,6705,6706,6707,6708,6709,6710,6711,6712,6713,6714,6715,6716,6717,6718,6719,6720,6721,6722,6723,6724,6725,6726,6727,6728
>>>> supervisor.worker.start.timeout.secs120supervisor.worker.timeout.secs30
>>>> task.heartbeat.frequency.secs3task.refresh.poll.secs10
>>>> topology.acker.executorstopology.builtin.metrics.bucket.size.secs60
>>>> topology.debugfalsetopology.disruptor.wait.strategy
>>>> com.lmax.disruptor.BlockingWaitStrategytopology.enable.message.timeouts
>>>> truetopology.error.throttle.interval.secs10
>>>> topology.executor.receive.buffer.size1024
>>>> topology.executor.send.buffer.size1024
>>>> topology.fall.back.on.java.serializationtruetopology.kryo.factory
>>>> backtype.storm.serialization.DefaultKryoFactory
>>>> topology.max.error.report.per.interval5topology.max.spout.pending
>>>> topology.max.task.parallelismtopology.message.timeout.secs30
>>>> topology.multilang.serializerbacktype.storm.multilang.JsonSerializer
>>>> topology.receiver.buffer.size8topology.skip.missing.kryo.registrations
>>>> falsetopology.sleep.spout.wait.strategy.time.ms1
>>>> topology.spout.wait.strategybacktype.storm.spout.SleepSpoutWaitStrategy
>>>> topology.state.synchronization.timeout.secs60topology.stats.sample.rate
>>>> 0.05topology.taskstopology.tick.tuple.freq.secs
>>>> topology.transfer.buffer.size1024
>>>> topology.trident.batch.emit.interval.millis500topology.tuple.serializer
>>>> backtype.storm.serialization.types.ListDelegateSerializer
>>>> topology.worker.childoptstopology.worker.receiver.thread.count1
>>>> topology.worker.shared.thread.pool.size4topology.workers1
>>>> transactional.zookeeper.porttransactional.zookeeper.root/transactional
>>>> transactional.zookeeper.serversui.childopts-Xmx768mui.port8080
>>>> worker.childopts-Xmx768mworker.heartbeat.frequency.secs1zmq.hwm0
>>>> zmq.linger.millis5000zmq.threads1
>>>>
>>>> --
>>>> Kushan Maskey
>>>> 817.403.7500
>>>>
>>>> On Tue, Sep 9, 2014 at 9:23 AM, Naresh Kosgi <nareshko...@gmail.com>
>>>> wrote:
>>>>
>>>>> What is your timeout setting for failing a tuple? Its hard to say what
>>>>> is causing this issue without more information but the default timeout on
>>>>> tuples is 30 seconds and for some tuples it maybe taking longer then 30
>>>>> seconds to process.  Try increasing the timeout to 1 or 2 min?
>>>>>
>>>>>
>>>>> "Why the ack/failure ack counts are so much higher than the number of
>>>>> records I am trying to process?"
>>>>>
>>>>> how are you implementing the fail() method in your spout?  on failure,
>>>>> this method is called by the framework.  It could be you are reemitting 
>>>>> the
>>>>> tuple to be processed and its failing again.  This could be a reason why u
>>>>> have more failed tuples then records
>>>>>
>>>>> On Tue, Sep 9, 2014 at 10:06 AM, Kushan Maskey <
>>>>> kushan.mas...@mmillerassociates.com> wrote:
>>>>>
>>>>>> I have a batch job where I process more than 100k records from file.
>>>>>> I post all these message to Kafka topic. I have a topology that goes and
>>>>>> fetches these records and dumps them into Cassandra database and also
>>>>>> updates solr and couch databases.
>>>>>>
>>>>>> I have been trying to run the process multiple times to make sure
>>>>>> that the process completes successfully. It does run successfully 
>>>>>> sometimes
>>>>>> and sometimes it errors out saying the following error that says "Too 
>>>>>> many
>>>>>> tuple failures" in the storm UI.
>>>>>>
>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Too many
>>>>>> tuple failures at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> at 
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
>>>>>> at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) at
>>>>>> backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:540)
>>>>>> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at
>>>>>> clojure.lang.AFn.run(AFn.java:24) at 
>>>>>> java.lang.Thread.run(Thread.java:744)
>>>>>> Caused by: java.lang.RuntimeException: Too many tuple failures at
>>>>>> storm.kafka.PartitionManager.fail(PartitionManager.java:210) at
>>>>>> storm.kafka.KafkaSpout.fail(KafkaSpout.java:174) at
>>>>>> backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:370) at
>>>>>> backtype.storm.daemon.executor$fn$reify__5576.expire(executor.clj:430) at
>>>>>> backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) at
>>>>>> backtype.storm.daemon.executor$fn__5573$tuple_action_fn__5579.invoke(executor.clj:435)
>>>>>> at
>>>>>> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:402)
>>>>>> at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
>>>>>> at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>> ... 6 more
>>>>>>
>>>>>> once this failure happens, i also see that the number of records
>>>>>> stored in Cassandra database if way much higher than the actual batch
>>>>>> records count. How do I handle this error? Also when there is any kind of
>>>>>> error/exception occurs then the ack failed values goes up form 0 to
>>>>>> thousands. Why the ack/failure ack counts are so much higher thank the
>>>>>> number of records I am trying to process?
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Kushan Maskey
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Regards,
Vikas Agarwal
91 – 9928301411

InfoObjects, Inc.
Execution Matters
http://www.infoobjects.com
2041 Mission College Boulevard, #280
Santa Clara, CA 95054
+1 (408) 988-2000 Work
+1 (408) 716-2726 Fax

Reply via email to