Hi, Raki:

        UpdateOffsetException means your request Offset is out of range in 
Kafka, try to delete zknode( it’s zk path should be "/transactional/your_txId" 
) and re-submit topology.

        IMHO, UpdateOffsetException should be caught and restart from latest 
offset by default. When i check the storm-kafka 
(https://github.com/apache/storm/tree/master/external/storm-kafka 
<https://github.com/apache/storm/tree/master/external/storm-kafka>), the newest 
version has fixed the issue, so you can also download the new storm-kafka to 
solve problem.

Regards
sy.pan


> 在 2015年12月11日,04:20,Rakesh Surendra <[email protected]> 写道:
> 
> My default property in storm.yaml file is => transactional.zookeeper.root: 
> "/transactional"
> 
> On Thu, Dec 10, 2015 at 2:19 PM, Rakesh Surendra <[email protected] 
> <mailto:[email protected]>> wrote:
> I am facing an exception when trying to use OpaqueKafkaTridentSpout and 
> topology stops processing tuples when i get this exception in one of the 
> storm/trident workers.
> 
> Code snippet of TridentKafkaConfig i’m using :-
> ----
> OpaqueTridentKafkaSpout kafkaSpout = null;
> TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new 
> ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> spoutConfig.fetchSizeBytes = 14748360;
> kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);
> ----
> Question : Do i need to add "/transactional' to zookeeper path to fix this 
> ????
> ----
> Exception being thrown : 
> ----
> 
> java.lang.RuntimeException: storm.kafka.UpdateOffsetException at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
>  at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
>  at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>  at 
> backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819)
>  at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) at 
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 
> Caused by: storm.kafka.UpdateOffsetException at 
> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at 
> storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
>  at 
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
>  at 
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
>  at 
> storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
>  at 
> storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
>  at 
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
>  at 
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
>  at 
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
>  at 
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
>  at 
> backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690)
>  at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436)
>  at 
> backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58)
>  at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132)
>  ... 6 more
> 
> Any kind of help/suggestion is appreciated...
> 
> Regards,
> Raki
> 

Reply via email to