Hi all!
I am trying to use an HDFS state in a Trident topology, but I always get
the exception I report below. I tried the following config values
topology.max.spout.pending=1
topology.trident.batch.emit.interval.millis=(500, then 1500, then 5000)
but nothing changed. If I remove the partitionPersist from my code, the
topology runs without any problems. I originally had a parallelHint of
3, now removed (I also cleared the zookeeper data directory between
different trials). I am using storm-hdfs 0.1.2 and storm
0.9.2-incubating, and running hadoop 2.5.1.
I am actually writing my code in clojure, but that's been no problem up
to now.
Can anyone suggest where to look for my mistake?
Thanks a lot!
Stefano
===========================================================================
2014-10-08 15:32:33 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: java.lang.NullPointerException: null
at
storm.trident.tuple.TridentTupleView.getValueByPointer(TridentTupleView.java:326)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.tuple.TridentTupleView.getValueByField(TridentTupleView.java:277)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
org.apache.storm.hdfs.trident.format.DelimitedRecordFormat.format(DelimitedRecordFormat.java:75)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsState$HdfsFileOptions.execute(HdfsState.java:157)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsState.updateState(HdfsState.java:289)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsUpdater.updateState(HdfsUpdater.java:12)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsUpdater.updateState(HdfsUpdater.java:9) ~[stormjar.jar:na]
at
storm.trident.planner.processor.PartitionPersistProcessor.finishBatch(PartitionPersistProcessor.java:98)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:359)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
... 6 common frames omitted
2014-10-08 15:32:33 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.NullPointerException
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: java.lang.NullPointerException: null
at
storm.trident.tuple.TridentTupleView.getValueByPointer(TridentTupleView.java:326)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.tuple.TridentTupleView.getValueByField(TridentTupleView.java:277)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
org.apache.storm.hdfs.trident.format.DelimitedRecordFormat.format(DelimitedRecordFormat.java:75)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsState$HdfsFileOptions.execute(HdfsState.java:157)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsState.updateState(HdfsState.java:289)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsUpdater.updateState(HdfsUpdater.java:12)
~[stormjar.jar:na]
at
org.apache.storm.hdfs.trident.HdfsUpdater.updateState(HdfsUpdater.java:9) ~[stormjar.jar:na]
at
storm.trident.planner.processor.PartitionPersistProcessor.finishBatch(PartitionPersistProcessor.java:98)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:359)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
... 6 common frames omitted
2014-10-08 15:32:34 b.s.util [INFO] Halting process: ("Worker died")