Generally, binary search combined with observation of the system (whether it meets throughput/latency targets) is a good approach. On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> wrote:
> Nathan, > The bolts are extending BaseBasicBolt and also, the in the spout am > explicitly emitting a msgId hence tuples should be tagged and anchored. > What I see is this - > 1. The logic exection in the bolt takes not more than 1 ms (start of > execute() and end of execute()) > 2. The time is being spent *between* the bolts > 3. The thread dumps all show LMAX disruptor at - > com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU > time is being spent. > > Is there a pattern with which the buffer sizes need to be tuned? :( > > Thanks > Kashyap > > On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.gramme...@gmail.com> > wrote: > >> Thanks for the clarification regarding Task ID's Nathan, I was under that >> false impression as the site docs are a bit misleading. Thanks for pointing >> that out! >> >> Regards. >> >> Kindly yours, >> >> Andrew Grammenos >> >> -- PGP PKey -- >> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >> >> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <ncle...@gmail.com> wrote: >> >>> If your tuples are reliable (spout emit with message id) and anchored >>> (emit from bolt anchored to input tuple), then you have to answer that >>> yourself. If not, then your output queue size is not constrained by the >>> framework and you may still have high latency. >>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>> wrote: >>> >>>> Nathan, >>>> My max spout pending is set to 1. Now is my problem with latency or >>>> with throughput. >>>> >>>> Thank you! >>>> Kashyap >>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >>>> >>>>> If your tuples are anchored max spout pending indirectly affects how >>>>> many tuples are generated ;). >>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks Nathan. One question though - Are there any best practices >>>>>> when tuples are getting generated in the topology and not really >>>>>> controllable via Max Spout Pending? >>>>>> >>>>>> Thanks >>>>>> Kashyap >>>>>> >>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <ncle...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Also I would argue that this is not important unless your >>>>>>> application is especially latency sensitive or your queue is so long >>>>>>> that >>>>>>> it is causing in flight tuples to timeout. >>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >>>>>>> >>>>>>>> Sorry for a brief response.. The number of tuples in flight >>>>>>>> absolutely affects your max latency. You need to tune your topology >>>>>>>> max >>>>>>>> spout pending. Lower value will reduce your end to end latency, but if >>>>>>>> it's too low it may affect throughput. I've posted to the group about >>>>>>>> this >>>>>>>> before; if you do a search you may find some posts where I've discussed >>>>>>>> this in more detail. >>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Nathan, >>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I >>>>>>>>> am inclined to believe that the no. of messages in the topology at >>>>>>>>> that >>>>>>>>> point in time is affecting the "latency". >>>>>>>>> >>>>>>>>> Am trying to now figure out how should the topology be structured >>>>>>>>> when the no. of transient tupples in the topology is very high. >>>>>>>>> >>>>>>>>> Topology is structured as follows - >>>>>>>>> Consumer (A java program sends a message to storm cluster) -> A >>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) >>>>>>>>> -> C >>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes >>>>>>>>> along >>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>>>>>> confirms >>>>>>>>> if all the 100 messages are processed) >>>>>>>>> >>>>>>>>> What I observed is as follows - >>>>>>>>> 1. The time taken for an end to end processing of the message >>>>>>>>> (Sending the message to Storm cluster and till the time the >>>>>>>>> aggregation is >>>>>>>>> complete) is directly dependent on the volume of messages that is >>>>>>>>> entering >>>>>>>>> into storm and also the no. of emits done by the spout A. >>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 >>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and >>>>>>>>> the >>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms* >>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 >>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples >>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 >>>>>>>>> seconds* >>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so >>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a >>>>>>>>> constraint, but this is the firs time am seeing this. >>>>>>>>> >>>>>>>>> Question - How should the use case be handled where in the no. of >>>>>>>>> tuples in the topology could increase exponentially.., >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Kashyap >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thank you all for the valuable info. >>>>>>>>>> >>>>>>>>>> Unfortunately, I have to use it for my (research) prototype >>>>>>>>>> therefore I have to go along with it. >>>>>>>>>> >>>>>>>>>> Thank you again, >>>>>>>>>> Nick >>>>>>>>>> >>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Storm task ids don't change: >>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>>>>>> andreas.gramme...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you >>>>>>>>>>>> have to have a specific task id and use "direct streams" which is >>>>>>>>>>>> error >>>>>>>>>>>> prone, probably increase latency and might introduce redundancy >>>>>>>>>>>> problems as >>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple >>>>>>>>>>>> will have >>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for >>>>>>>>>>>> some reason >>>>>>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>>>>>> re-spawned >>>>>>>>>>>> task's id. >>>>>>>>>>>> >>>>>>>>>>>> Hope this helps. >>>>>>>>>>>> >>>>>>>>>>>> Kindly yours, >>>>>>>>>>>> >>>>>>>>>>>> Andrew Grammenos >>>>>>>>>>>> >>>>>>>>>>>> -- PGP PKey -- >>>>>>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello again, >>>>>>>>>>>>> >>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I >>>>>>>>>>>>> am working on has to be able to send tuples directly to specific >>>>>>>>>>>>> tasks. In >>>>>>>>>>>>> general control the data flow. Can you please explain to me why >>>>>>>>>>>>> you would >>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in >>>>>>>>>>>>> the >>>>>>>>>>>>> architecture of Storm? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Nick >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good >>>>>>>>>>>>>> reason for it. Shuffle grouping is essentially random with even >>>>>>>>>>>>>> distribution which makes it easier to characterize its >>>>>>>>>>>>>> performance. Local >>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be >>>>>>>>>>>>>> faster. >>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>>>>>> starvation >>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 >>>>>>>>>>>>>> spout task, >>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct >>>>>>>>>>>>>> grouping depends >>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping >>>>>>>>>>>>>> depends on >>>>>>>>>>>>>> your key distribution, etc. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have two questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same >>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of >>>>>>>>>>>>>>> latency >>>>>>>>>>>>>>> (mainly because of clock drifting). >>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among >>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than >>>>>>>>>>>>>>> direct grouping? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Nick >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. >>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are >>>>>>>>>>>>>>>> joining all of them. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker). >>>>>>>>>>>>>>>> All your ack traffic is going to a single task. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to >>>>>>>>>>>>>>>> reduce network transfers. >>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>>>>>> kashya...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using >>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt >>>>>>>>>>>>>>>>> latency on same machine or across machines ranges between >>>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network >>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time >>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in >>>>>>>>>>>>>>>>> execute() of >>>>>>>>>>>>>>>>> next bolt. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) >>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 >>>>>>>>>>>>>>>>> each) -> C >>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of >>>>>>>>>>>>>>>>> 10 numbers) -> >>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one >>>>>>>>>>>>>>>>> message] -> E >>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 >>>>>>>>>>>>>>>>> messages are >>>>>>>>>>>>>>>>> processed) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, >>>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not >>>>>>>>>>>>>>>>> more than 50 >>>>>>>>>>>>>>>>> msec including any latencies. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Observations* >>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 >>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that >>>>>>>>>>>>>>>>> each bolt and >>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies. >>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between >>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time >>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the >>>>>>>>>>>>>>>>> spout/bolt buffers >>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes >>>>>>>>>>>>>>>>> from there. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB >>>>>>>>>>>>>>>>> and there are 20 workers overall. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Test* >>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *Config values* >>>>>>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, >>>>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues >>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken >>>>>>>>>>>>>>>>> between spout/bolt >>>>>>>>>>>>>>>>> and another bolt. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Kashyap >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >> >