I tried looking for a solution and could find this, CustomStreamGrouping

I guess this should help me out, but I am getting an exception while
implementing this.

java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException at
clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at
clojure.lang.PersistentVector.nth(PersistentVector.java:111) at
clojure.lang.APersistentVector.get(APersistentVector.java:171) at
com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24)
at
backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49)
at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at
backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663)
at
backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698)
at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at
backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at
backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36)
at
backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40)
at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at
backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
at
backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
... 6 more

Let me know who has even faced the same issue.

On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <clearmido...@gmail.com>
wrote:

> Hi All,
>
> I am having a topology with Kafka Spout Implementation with the
> topologyBuilder mentioned below,
>
>         TopologyBuilder builder=new TopologyBuilder();
>         builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8);
>         builder.setBolt("Parser", new
> ParserBolt()).globalGrouping("KafkaSpout");
>         builder.setBolt("FileBolt", new
> PersistBolt()).globalGrouping("Parser");
>
>         Config config=new Config();
>         config.put(Config.TOPOLOGY_WORKERS, 4);
>         config.setNumWorkers(2);
>         config.setMaxSpoutPending(10);
>         config.setMaxTaskParallelism(10);
>
> I am having two level of Bolts,
>
> 1) Parser - Parsing of data and emitting a output tuple value which is
> containing POJO serialized object
> 2) Persist - Persisting of the forwarded data after some computation,
> which is received through previous bolt(Parser).
>
> Now I was looking out a way for the last PersistBolt("FileBolt") I want
> the field grouping on the parser bolt based on the some field value(POJO)
> which is being emitted.
>
>
> To make it more clear,
>
> Parser is emitting a POJO of the form,
>
> collector.emit(new Values(responseHandler));
>
> where responseHandler is a POJO,
>
> public class ResponseHandler implements Serializable{
>
> private String host = null;
> private String user = null;
> private String msg = null;
>  public String getHost() {
> return host;
> }
> public void setHost(String host) {
> this.host = host;
> }
> public String getUser() {
> return hostName;
> }
> public void setuser(String user) {
> this.user = user;
> }
> public String getMsg() {
> return msg;
> }
> public void setMsg(String msg) {
> this.msg = msg;
> }
>  }
>
> Now I was looking out for a way to field group on the host and user level.
>
> Actively looking for the way around!
>
> Thanks!
>

Reply via email to