Yes, that is what I was talking about. Hopefully that fixes it. On Tue, Sep 9, 2014 at 10:59 AM, Kushan Maskey < [email protected]> wrote:
> Just realized that the tuple timeout you are talking about is the > "topology.message.timeout.secs" > which was set to 30 sec and now I made to to 120. > > -- > Kushan Maskey > 817.403.7500 > > On Tue, Sep 9, 2014 at 9:43 AM, Kushan Maskey < > [email protected]> wrote: > >> >> Thanks and apologies, I should I mentioned that in my question earlier. I >> am using storm 0.9.2 and using the inbuilt KafkaSpout. I do not implement >> any failure my self. Do I need to create my own custom KafkaSpout? >> >> I have not set timeout for tuples. In fact I dont know where to set that. >> Here is my storm config if that is where I need to set the time out. But >> non of them say anything about tuple timeout. >> >> dev.zookeeper.path/tmp/dev-storm-zookeeperdrpc.childopts-Xmx768m >> drpc.invocations.port3773drpc.port3772drpc.queue.size128 >> drpc.request.timeout.secs600drpc.worker.threads64java.library.path >> /usr/local/lib:/opt/local/lib:/usr/liblogviewer.appender.nameA1 >> logviewer.childopts-Xmx128mlogviewer.port8000nimbus.childopts-Xmx1024m >> nimbus.cleanup.inbox.freq.secs600nimbus.file.copy.expiration.secs600 >> nimbus.hostnmcxstrmd001nimbus.inbox.jar.expiration.secs3600 >> nimbus.monitor.freq.secs10nimbus.reassigntrue >> nimbus.supervisor.timeout.secs60nimbus.task.launch.secs120 >> nimbus.task.timeout.secs30nimbus.thrift.max_buffer_size1048576 >> nimbus.thrift.port6627nimbus.topology.validator >> backtype.storm.nimbus.DefaultTopologyValidatorstorm.cluster.mode >> distributedstorm.local.dir/data/disk00/storm/localdirstorm.local.mode.zmq >> falsestorm.messaging.netty.buffer_size5242880 >> storm.messaging.netty.client_worker_threads1 >> storm.messaging.netty.flush.check.interval.ms10 >> storm.messaging.netty.max_retries30storm.messaging.netty.max_wait_ms1000 >> storm.messaging.netty.min_wait_ms100 >> storm.messaging.netty.server_worker_threads1 >> storm.messaging.netty.transfer.batch.size262144storm.messaging.transport >> backtype.storm.messaging.netty.Contextstorm.thrift.transport >> backtype.storm.security.auth.SimpleTransportPlugin >> storm.zookeeper.connection.timeout15000storm.zookeeper.port2181 >> storm.zookeeper.retry.interval1000 >> storm.zookeeper.retry.intervalceiling.millis30000 >> storm.zookeeper.retry.times5storm.zookeeper.root/storm >> storm.zookeeper.serversnmcxstrmd001storm.zookeeper.session.timeout20000 >> supervisor.childopts-Xmx256msupervisor.enabletrue >> supervisor.heartbeat.frequency.secs5supervisor.monitor.frequency.secs3 >> supervisor.slots.ports >> 6700,6701,6702,6703,6704,6705,6706,6707,6708,6709,6710,6711,6712,6713,6714,6715,6716,6717,6718,6719,6720,6721,6722,6723,6724,6725,6726,6727,6728 >> supervisor.worker.start.timeout.secs120supervisor.worker.timeout.secs30 >> task.heartbeat.frequency.secs3task.refresh.poll.secs10 >> topology.acker.executorstopology.builtin.metrics.bucket.size.secs60 >> topology.debugfalsetopology.disruptor.wait.strategy >> com.lmax.disruptor.BlockingWaitStrategytopology.enable.message.timeouts >> truetopology.error.throttle.interval.secs10 >> topology.executor.receive.buffer.size1024 >> topology.executor.send.buffer.size1024 >> topology.fall.back.on.java.serializationtruetopology.kryo.factory >> backtype.storm.serialization.DefaultKryoFactory >> topology.max.error.report.per.interval5topology.max.spout.pending >> topology.max.task.parallelismtopology.message.timeout.secs30 >> topology.multilang.serializerbacktype.storm.multilang.JsonSerializer >> topology.receiver.buffer.size8topology.skip.missing.kryo.registrations >> falsetopology.sleep.spout.wait.strategy.time.ms1 >> topology.spout.wait.strategybacktype.storm.spout.SleepSpoutWaitStrategy >> topology.state.synchronization.timeout.secs60topology.stats.sample.rate >> 0.05topology.taskstopology.tick.tuple.freq.secs >> topology.transfer.buffer.size1024 >> topology.trident.batch.emit.interval.millis500topology.tuple.serializer >> backtype.storm.serialization.types.ListDelegateSerializer >> topology.worker.childoptstopology.worker.receiver.thread.count1 >> topology.worker.shared.thread.pool.size4topology.workers1 >> transactional.zookeeper.porttransactional.zookeeper.root/transactional >> transactional.zookeeper.serversui.childopts-Xmx768mui.port8080 >> worker.childopts-Xmx768mworker.heartbeat.frequency.secs1zmq.hwm0 >> zmq.linger.millis5000zmq.threads1 >> >> -- >> Kushan Maskey >> 817.403.7500 >> >> On Tue, Sep 9, 2014 at 9:23 AM, Naresh Kosgi <[email protected]> >> wrote: >> >>> What is your timeout setting for failing a tuple? Its hard to say what >>> is causing this issue without more information but the default timeout on >>> tuples is 30 seconds and for some tuples it maybe taking longer then 30 >>> seconds to process. Try increasing the timeout to 1 or 2 min? >>> >>> >>> "Why the ack/failure ack counts are so much higher than the number of >>> records I am trying to process?" >>> >>> how are you implementing the fail() method in your spout? on failure, >>> this method is called by the framework. It could be you are reemitting the >>> tuple to be processed and its failing again. This could be a reason why u >>> have more failed tuples then records >>> >>> On Tue, Sep 9, 2014 at 10:06 AM, Kushan Maskey < >>> [email protected]> wrote: >>> >>>> I have a batch job where I process more than 100k records from file. I >>>> post all these message to Kafka topic. I have a topology that goes and >>>> fetches these records and dumps them into Cassandra database and also >>>> updates solr and couch databases. >>>> >>>> I have been trying to run the process multiple times to make sure that >>>> the process completes successfully. It does run successfully sometimes and >>>> sometimes it errors out saying the following error that says "Too many >>>> tuple failures" in the storm UI. >>>> >>>> java.lang.RuntimeException: java.lang.RuntimeException: Too many tuple >>>> failures at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>> at backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87) >>>> at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76) at >>>> backtype.storm.daemon.executor$fn__5573$fn__5588$fn__5617.invoke(executor.clj:540) >>>> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) at >>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:744) >>>> Caused by: java.lang.RuntimeException: Too many tuple failures at >>>> storm.kafka.PartitionManager.fail(PartitionManager.java:210) at >>>> storm.kafka.KafkaSpout.fail(KafkaSpout.java:174) at >>>> backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:370) at >>>> backtype.storm.daemon.executor$fn$reify__5576.expire(executor.clj:430) at >>>> backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73) at >>>> backtype.storm.daemon.executor$fn__5573$tuple_action_fn__5579.invoke(executor.clj:435) >>>> at >>>> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:402) >>>> at >>>> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>> ... 6 more >>>> >>>> once this failure happens, i also see that the number of records stored >>>> in Cassandra database if way much higher than the actual batch records >>>> count. How do I handle this error? Also when there is any kind of >>>> error/exception occurs then the ack failed values goes up form 0 to >>>> thousands. Why the ack/failure ack counts are so much higher thank the >>>> number of records I am trying to process? >>>> >>>> >>>> -- >>>> Kushan Maskey >>>> >>> >>> >> >
