I tried looking for a solution and could find this, CustomStreamGrouping I guess this should help me out, but I am getting an exception while implementing this.
java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException at clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at clojure.lang.PersistentVector.nth(PersistentVector.java:111) at clojure.lang.APersistentVector.get(APersistentVector.java:171) at com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24) at backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49) at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663) at backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36) at backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40) at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ... 6 more Let me know who has even faced the same issue. On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <clearmido...@gmail.com> wrote: > Hi All, > > I am having a topology with Kafka Spout Implementation with the > topologyBuilder mentioned below, > > TopologyBuilder builder=new TopologyBuilder(); > builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8); > builder.setBolt("Parser", new > ParserBolt()).globalGrouping("KafkaSpout"); > builder.setBolt("FileBolt", new > PersistBolt()).globalGrouping("Parser"); > > Config config=new Config(); > config.put(Config.TOPOLOGY_WORKERS, 4); > config.setNumWorkers(2); > config.setMaxSpoutPending(10); > config.setMaxTaskParallelism(10); > > I am having two level of Bolts, > > 1) Parser - Parsing of data and emitting a output tuple value which is > containing POJO serialized object > 2) Persist - Persisting of the forwarded data after some computation, > which is received through previous bolt(Parser). > > Now I was looking out a way for the last PersistBolt("FileBolt") I want > the field grouping on the parser bolt based on the some field value(POJO) > which is being emitted. > > > To make it more clear, > > Parser is emitting a POJO of the form, > > collector.emit(new Values(responseHandler)); > > where responseHandler is a POJO, > > public class ResponseHandler implements Serializable{ > > private String host = null; > private String user = null; > private String msg = null; > public String getHost() { > return host; > } > public void setHost(String host) { > this.host = host; > } > public String getUser() { > return hostName; > } > public void setuser(String user) { > this.user = user; > } > public String getMsg() { > return msg; > } > public void setMsg(String msg) { > this.msg = msg; > } > } > > Now I was looking out for a way to field group on the host and user level. > > Actively looking for the way around! > > Thanks! >