Well I got it. . .its just simply not possible to distribute the data equally among all the working threads as it will compute key based on hash and since I don't have many keys they tend to end up into same worker.
I guess that should be fine! Thanks anyways! On Mon, Feb 23, 2015 at 7:15 PM, Vineet Mishra <clearmido...@gmail.com> wrote: > Hi Nathan, > > Thanks for your revert but eventually what I am following is the same > approach you have mentioned but still couldn't get the benefit of > parallelism, so just to brief I have 3 node cluster setup with 1 supervisor > and 2 workers. > > After running the instance, I could see that multiple tasks are running in > the same node. > > *Id* *Uptime* *Host* *Port* *Emitted* *Transferred* *Capacity (last > 10m)* *Execute latency (ms)* *Executed* *Process latency (ms)* *Acked* > *Failed* *[5-5]}* *1m 56s* *ip-20-0-0-75* *6703* > <http://ip-20-0-0-75:8000/log?file=worker-6703.log> *0* *0* *0.296* *0.02* > *1723420* *0.02* *1723420* *0* [4-4]} 30m 4s ip-20-0-0-78 6703 > <http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0 0.029 9700 0.043 > 9720 0 [3-3]} 1m 56s ip-20-0-0-75 6703 > <http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.001 0.025 2380 > 0.017 2360 0 [2-2]} 30m 4s ip-20-0-0-78 6703 > <http://ip-20-0-0-78:8000/log?file=worker-6703.log> 0 0 0.004 0.025 400680 > 0.024 400680 0 [1-1]} 1m 56s ip-20-0-0-75 6703 > <http://ip-20-0-0-75:8000/log?file=worker-6703.log> 0 0 0.019 0.023 96480 > 0.02 96500 0 > > Which seems to the reason behind why their is lag in running the topology. > I was looking out a way to curb this gap! > > Thanks! > > > On Mon, Feb 23, 2015 at 6:30 PM, Nathan Leung <ncle...@gmail.com> wrote: > >> You can put user and host in separate tuple fields and do fields grouping >> on those fields. >> On Feb 23, 2015 6:18 AM, "Vineet Mishra" <clearmido...@gmail.com> wrote: >> >>> I tried looking for a solution and could find this, CustomStreamGrouping >>> >>> I guess this should help me out, but I am getting an exception while >>> implementing this. >>> >>> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>> at >>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>> at >>> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) >>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at >>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.IndexOutOfBoundsException at >>> clojure.lang.PersistentVector.arrayFor(PersistentVector.java:107) at >>> clojure.lang.PersistentVector.nth(PersistentVector.java:111) at >>> clojure.lang.APersistentVector.get(APersistentVector.java:171) at >>> com.sd.dwh.kafka.storm.plugin.HostAPIGrouping.chooseTasks(HostAPIGrouping.java:24) >>> at >>> backtype.storm.daemon.executor$mk_custom_grouper$fn__3151.invoke(executor.clj:49) >>> at backtype.storm.daemon.task$mk_tasks_fn$fn__3101.invoke(task.clj:158) at >>> backtype.storm.daemon.executor$fn__3441$fn__3453$bolt_emit__3480.invoke(executor.clj:663) >>> at >>> backtype.storm.daemon.executor$fn__3441$fn$reify__3486.emit(executor.clj:698) >>> at backtype.storm.task.OutputCollector.emit(OutputCollector.java:203) at >>> backtype.storm.task.OutputCollector.emit(OutputCollector.java:49) at >>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:36) >>> at >>> backtype.storm.topology.BasicOutputCollector.emit(BasicOutputCollector.java:40) >>> at com.sd.dwh.kafka.storm.ParserBolt.execute(ParserBolt.java:76) at >>> backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) >>> at >>> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) >>> at >>> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) >>> at >>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>> ... 6 more >>> >>> Let me know who has even faced the same issue. >>> >>> On Mon, Feb 23, 2015 at 3:45 PM, Vineet Mishra <clearmido...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> I am having a topology with Kafka Spout Implementation with the >>>> topologyBuilder mentioned below, >>>> >>>> TopologyBuilder builder=new TopologyBuilder(); >>>> builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 8); >>>> builder.setBolt("Parser", new >>>> ParserBolt()).globalGrouping("KafkaSpout"); >>>> builder.setBolt("FileBolt", new >>>> PersistBolt()).globalGrouping("Parser"); >>>> >>>> Config config=new Config(); >>>> config.put(Config.TOPOLOGY_WORKERS, 4); >>>> config.setNumWorkers(2); >>>> config.setMaxSpoutPending(10); >>>> config.setMaxTaskParallelism(10); >>>> >>>> I am having two level of Bolts, >>>> >>>> 1) Parser - Parsing of data and emitting a output tuple value which is >>>> containing POJO serialized object >>>> 2) Persist - Persisting of the forwarded data after some computation, >>>> which is received through previous bolt(Parser). >>>> >>>> Now I was looking out a way for the last PersistBolt("FileBolt") I want >>>> the field grouping on the parser bolt based on the some field value(POJO) >>>> which is being emitted. >>>> >>>> >>>> To make it more clear, >>>> >>>> Parser is emitting a POJO of the form, >>>> >>>> collector.emit(new Values(responseHandler)); >>>> >>>> where responseHandler is a POJO, >>>> >>>> public class ResponseHandler implements Serializable{ >>>> >>>> private String host = null; >>>> private String user = null; >>>> private String msg = null; >>>> public String getHost() { >>>> return host; >>>> } >>>> public void setHost(String host) { >>>> this.host = host; >>>> } >>>> public String getUser() { >>>> return hostName; >>>> } >>>> public void setuser(String user) { >>>> this.user = user; >>>> } >>>> public String getMsg() { >>>> return msg; >>>> } >>>> public void setMsg(String msg) { >>>> this.msg = msg; >>>> } >>>> } >>>> >>>> Now I was looking out for a way to field group on the host and user >>>> level. >>>> >>>> Actively looking for the way around! >>>> >>>> Thanks! >>>> >>> >>> >