Nathan, The bolts are extending BaseBasicBolt and also, the in the spout am explicitly emitting a msgId hence tuples should be tagged and anchored. What I see is this - 1. The logic exection in the bolt takes not more than 1 ms (start of execute() and end of execute()) 2. The time is being spent *between* the bolts 3. The thread dumps all show LMAX disruptor at - com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU time is being spent.
Is there a pattern with which the buffer sizes need to be tuned? :( Thanks Kashyap On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.gramme...@gmail.com> wrote: > Thanks for the clarification regarding Task ID's Nathan, I was under that > false impression as the site docs are a bit misleading. Thanks for pointing > that out! > > Regards. > > Kindly yours, > > Andrew Grammenos > > -- PGP PKey -- > <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> > https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt > > On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <ncle...@gmail.com> wrote: > >> If your tuples are reliable (spout emit with message id) and anchored >> (emit from bolt anchored to input tuple), then you have to answer that >> yourself. If not, then your output queue size is not constrained by the >> framework and you may still have high latency. >> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> wrote: >> >>> Nathan, >>> My max spout pending is set to 1. Now is my problem with latency or with >>> throughput. >>> >>> Thank you! >>> Kashyap >>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >>> >>>> If your tuples are anchored max spout pending indirectly affects how >>>> many tuples are generated ;). >>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Nathan. One question though - Are there any best practices when >>>>> tuples are getting generated in the topology and not really controllable >>>>> via Max Spout Pending? >>>>> >>>>> Thanks >>>>> Kashyap >>>>> >>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <ncle...@gmail.com> >>>>> wrote: >>>>> >>>>>> Also I would argue that this is not important unless your application >>>>>> is especially latency sensitive or your queue is so long that it is >>>>>> causing >>>>>> in flight tuples to timeout. >>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >>>>>> >>>>>>> Sorry for a brief response.. The number of tuples in flight >>>>>>> absolutely affects your max latency. You need to tune your topology max >>>>>>> spout pending. Lower value will reduce your end to end latency, but if >>>>>>> it's too low it may affect throughput. I've posted to the group about >>>>>>> this >>>>>>> before; if you do a search you may find some posts where I've discussed >>>>>>> this in more detail. >>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Nathan, >>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I >>>>>>>> am inclined to believe that the no. of messages in the topology at that >>>>>>>> point in time is affecting the "latency". >>>>>>>> >>>>>>>> Am trying to now figure out how should the topology be structured >>>>>>>> when the no. of transient tupples in the topology is very high. >>>>>>>> >>>>>>>> Topology is structured as follows - >>>>>>>> Consumer (A java program sends a message to storm cluster) -> A >>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) >>>>>>>> -> C >>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes >>>>>>>> along >>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>>>>> confirms >>>>>>>> if all the 100 messages are processed) >>>>>>>> >>>>>>>> What I observed is as follows - >>>>>>>> 1. The time taken for an end to end processing of the message >>>>>>>> (Sending the message to Storm cluster and till the time the >>>>>>>> aggregation is >>>>>>>> complete) is directly dependent on the volume of messages that is >>>>>>>> entering >>>>>>>> into storm and also the no. of emits done by the spout A. >>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 >>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and >>>>>>>> the >>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms* >>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 >>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples >>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 >>>>>>>> seconds* >>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so >>>>>>>> much more bad. Am trying to figure out if the memory per worker is a >>>>>>>> constraint, but this is the firs time am seeing this. >>>>>>>> >>>>>>>> Question - How should the use case be handled where in the no. of >>>>>>>> tuples in the topology could increase exponentially.., >>>>>>>> >>>>>>>> Thanks >>>>>>>> Kashyap >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank you all for the valuable info. >>>>>>>>> >>>>>>>>> Unfortunately, I have to use it for my (research) prototype >>>>>>>>> therefore I have to go along with it. >>>>>>>>> >>>>>>>>> Thank you again, >>>>>>>>> Nick >>>>>>>>> >>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>> >>>>>>>>>> Storm task ids don't change: >>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>>>>> >>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>>>>> andreas.gramme...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you >>>>>>>>>>> have to have a specific task id and use "direct streams" which is >>>>>>>>>>> error >>>>>>>>>>> prone, probably increase latency and might introduce redundancy >>>>>>>>>>> problems as >>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple >>>>>>>>>>> will have >>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for >>>>>>>>>>> some reason >>>>>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>>>>> re-spawned >>>>>>>>>>> task's id. >>>>>>>>>>> >>>>>>>>>>> Hope this helps. >>>>>>>>>>> >>>>>>>>>>> Kindly yours, >>>>>>>>>>> >>>>>>>>>>> Andrew Grammenos >>>>>>>>>>> >>>>>>>>>>> -- PGP PKey -- >>>>>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello again, >>>>>>>>>>>> >>>>>>>>>>>> Nathan, I am using direct-grouping because the application I am >>>>>>>>>>>> working on has to be able to send tuples directly to specific >>>>>>>>>>>> tasks. In >>>>>>>>>>>> general control the data flow. Can you please explain to me why >>>>>>>>>>>> you would >>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in >>>>>>>>>>>> the >>>>>>>>>>>> architecture of Storm? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Nick >>>>>>>>>>>> >>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> I would not recommend direct grouping unless you have a good >>>>>>>>>>>>> reason for it. Shuffle grouping is essentially random with even >>>>>>>>>>>>> distribution which makes it easier to characterize its >>>>>>>>>>>>> performance. Local >>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be >>>>>>>>>>>>> faster. >>>>>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>>>>> starvation >>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 >>>>>>>>>>>>> spout task, >>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct >>>>>>>>>>>>> grouping depends >>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping >>>>>>>>>>>>> depends on >>>>>>>>>>>>> your key distribution, etc. >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have two questions: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same >>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of >>>>>>>>>>>>>> latency >>>>>>>>>>>>>> (mainly because of clock drifting). >>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among >>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct >>>>>>>>>>>>>> grouping? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Nick >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. >>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining >>>>>>>>>>>>>>> all of them. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker). >>>>>>>>>>>>>>> All your ack traffic is going to a single task. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to >>>>>>>>>>>>>>> reduce network transfers. >>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>>>>> kashya...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using >>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt >>>>>>>>>>>>>>>> latency on same machine or across machines ranges between >>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network >>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time >>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in >>>>>>>>>>>>>>>> execute() of >>>>>>>>>>>>>>>> next bolt. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives >>>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C >>>>>>>>>>>>>>>> (bolt) >>>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 >>>>>>>>>>>>>>>> numbers) -> D >>>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one >>>>>>>>>>>>>>>> message] -> E >>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 >>>>>>>>>>>>>>>> messages are >>>>>>>>>>>>>>>> processed) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, >>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not >>>>>>>>>>>>>>>> more than 50 >>>>>>>>>>>>>>>> msec including any latencies. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *Observations* >>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 >>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that >>>>>>>>>>>>>>>> each bolt and >>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies. >>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between >>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time >>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the >>>>>>>>>>>>>>>> spout/bolt buffers >>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes >>>>>>>>>>>>>>>> from there. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB >>>>>>>>>>>>>>>> and there are 20 workers overall. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *Test* >>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *Config values* >>>>>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, >>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues >>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken >>>>>>>>>>>>>>>> between spout/bolt >>>>>>>>>>>>>>>> and another bolt. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Kashyap >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>> >>>>>>>> >>>>>>>> >>>>> >