Thanks for the clarification regarding Task ID's Nathan, I was under that false impression as the site docs are a bit misleading. Thanks for pointing that out!
Regards. Kindly yours, Andrew Grammenos -- PGP PKey -- <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <ncle...@gmail.com> wrote: > If your tuples are reliable (spout emit with message id) and anchored > (emit from bolt anchored to input tuple), then you have to answer that > yourself. If not, then your output queue size is not constrained by the > framework and you may still have high latency. > On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> wrote: > >> Nathan, >> My max spout pending is set to 1. Now is my problem with latency or with >> throughput. >> >> Thank you! >> Kashyap >> On Jul 16, 2015 5:46 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >> >>> If your tuples are anchored max spout pending indirectly affects how >>> many tuples are generated ;). >>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>> wrote: >>> >>>> Thanks Nathan. One question though - Are there any best practices when >>>> tuples are getting generated in the topology and not really controllable >>>> via Max Spout Pending? >>>> >>>> Thanks >>>> Kashyap >>>> >>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <ncle...@gmail.com> >>>> wrote: >>>> >>>>> Also I would argue that this is not important unless your application >>>>> is especially latency sensitive or your queue is so long that it is >>>>> causing >>>>> in flight tuples to timeout. >>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <ncle...@gmail.com> wrote: >>>>> >>>>>> Sorry for a brief response.. The number of tuples in flight >>>>>> absolutely affects your max latency. You need to tune your topology max >>>>>> spout pending. Lower value will reduce your end to end latency, but if >>>>>> it's too low it may affect throughput. I've posted to the group about >>>>>> this >>>>>> before; if you do a search you may find some posts where I've discussed >>>>>> this in more detail. >>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Nathan, >>>>>>> Thanks. Have been running on a bare bones topology as suggested. I >>>>>>> am inclined to believe that the no. of messages in the topology at that >>>>>>> point in time is affecting the "latency". >>>>>>> >>>>>>> Am trying to now figure out how should the topology be structured >>>>>>> when the no. of transient tupples in the topology is very high. >>>>>>> >>>>>>> Topology is structured as follows - >>>>>>> Consumer (A java program sends a message to storm cluster) -> A >>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) -> >>>>>>> C >>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes along >>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and >>>>>>> confirms >>>>>>> if all the 100 messages are processed) >>>>>>> >>>>>>> What I observed is as follows - >>>>>>> 1. The time taken for an end to end processing of the message >>>>>>> (Sending the message to Storm cluster and till the time the aggregation >>>>>>> is >>>>>>> complete) is directly dependent on the volume of messages that is >>>>>>> entering >>>>>>> into storm and also the no. of emits done by the spout A. >>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100 tuples >>>>>>> per message (100X100=10000) there are 10000 tuples emitted and the time >>>>>>> taken to aggregate the 100 is 15 ms to 100 ms* >>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000 >>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples >>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 >>>>>>> seconds* >>>>>>> 2.Strange thing is - the more parallelism i add, the times are so >>>>>>> much more bad. Am trying to figure out if the memory per worker is a >>>>>>> constraint, but this is the firs time am seeing this. >>>>>>> >>>>>>> Question - How should the use case be handled where in the no. of >>>>>>> tuples in the topology could increase exponentially.., >>>>>>> >>>>>>> Thanks >>>>>>> Kashyap >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis < >>>>>>> nick.kat...@gmail.com> wrote: >>>>>>> >>>>>>>> Thank you all for the valuable info. >>>>>>>> >>>>>>>> Unfortunately, I have to use it for my (research) prototype >>>>>>>> therefore I have to go along with it. >>>>>>>> >>>>>>>> Thank you again, >>>>>>>> Nick >>>>>>>> >>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>> >>>>>>>>> Storm task ids don't change: >>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c >>>>>>>>> >>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor < >>>>>>>>> andreas.gramme...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Direct grouping as it is shown in storm docs, means that you have >>>>>>>>>> to have a specific task id and use "direct streams" which is error >>>>>>>>>> prone, >>>>>>>>>> probably increase latency and might introduce redundancy problems as >>>>>>>>>> the >>>>>>>>>> producer of tuple needs to know the id of the task the tuple will >>>>>>>>>> have to >>>>>>>>>> go; so imagine a scenario where the receiving task fails for some >>>>>>>>>> reason >>>>>>>>>> and the producer can't relay the tuples unless it received the >>>>>>>>>> re-spawned >>>>>>>>>> task's id. >>>>>>>>>> >>>>>>>>>> Hope this helps. >>>>>>>>>> >>>>>>>>>> Kindly yours, >>>>>>>>>> >>>>>>>>>> Andrew Grammenos >>>>>>>>>> >>>>>>>>>> -- PGP PKey -- >>>>>>>>>> <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt> >>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt >>>>>>>>>> >>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis < >>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello again, >>>>>>>>>>> >>>>>>>>>>> Nathan, I am using direct-grouping because the application I am >>>>>>>>>>> working on has to be able to send tuples directly to specific >>>>>>>>>>> tasks. In >>>>>>>>>>> general control the data flow. Can you please explain to me why you >>>>>>>>>>> would >>>>>>>>>>> not recommend direct grouping? Is there any particular reason in the >>>>>>>>>>> architecture of Storm? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Nick >>>>>>>>>>> >>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> I would not recommend direct grouping unless you have a good >>>>>>>>>>>> reason for it. Shuffle grouping is essentially random with even >>>>>>>>>>>> distribution which makes it easier to characterize its >>>>>>>>>>>> performance. Local >>>>>>>>>>>> or shuffle grouping stays in process so generally it will be >>>>>>>>>>>> faster. >>>>>>>>>>>> However you have to be careful in certain cases to avoid task >>>>>>>>>>>> starvation >>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 >>>>>>>>>>>> spout task, >>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct grouping >>>>>>>>>>>> depends >>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping >>>>>>>>>>>> depends on >>>>>>>>>>>> your key distribution, etc. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis < >>>>>>>>>>>> nick.kat...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello all, >>>>>>>>>>>>> >>>>>>>>>>>>> I have two questions: >>>>>>>>>>>>> >>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same >>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of >>>>>>>>>>>>> latency >>>>>>>>>>>>> (mainly because of clock drifting). >>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among different >>>>>>>>>>>>> groupings? For instance, is shuffle faster than direct grouping? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Nick >>>>>>>>>>>>> >>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncle...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism. >>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining >>>>>>>>>>>>>> all of them. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Second, try the default number of ackers (one per worker). >>>>>>>>>>>>>> All your ack traffic is going to a single task. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to >>>>>>>>>>>>>> reduce network transfers. >>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" < >>>>>>>>>>>>>> kashya...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> We are attempting a real-time distributed computing using >>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt >>>>>>>>>>>>>>> latency on same machine or across machines ranges between 2 >>>>>>>>>>>>>>> - 250 ms. I am not able to figure out why. Network latency is >>>>>>>>>>>>>>> under 0.5 ms. By latency, I mean the time between an emit >>>>>>>>>>>>>>> of one bolt/spout to getting the message in execute() of next >>>>>>>>>>>>>>> bolt. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a topology like the below - >>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives >>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C >>>>>>>>>>>>>>> (bolt) >>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 >>>>>>>>>>>>>>> numbers) -> D >>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one >>>>>>>>>>>>>>> message] -> E >>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 >>>>>>>>>>>>>>> messages are >>>>>>>>>>>>>>> processed) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result, I >>>>>>>>>>>>>>> estimated that the end to end processing for 1000 takes not >>>>>>>>>>>>>>> more than 50 >>>>>>>>>>>>>>> msec including any latencies. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Observations* >>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200 msec >>>>>>>>>>>>>>> to 3 seconds. My estimate was under 50 msec given that each >>>>>>>>>>>>>>> bolt and spout >>>>>>>>>>>>>>> take under 3 msec to execute including any latencies. >>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between Emit >>>>>>>>>>>>>>> from a Spout/Bolt and execute() of the consuming bolt. >>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am not able to figure out why it takes so much time >>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the >>>>>>>>>>>>>>> spout/bolt buffers >>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes >>>>>>>>>>>>>>> from there. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Infrastructure* >>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB >>>>>>>>>>>>>>> and there are 20 workers overall. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Test* >>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25 >>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *Config values* >>>>>>>>>>>>>>> Config config = new Config(); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20)); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, >>>>>>>>>>>>>>> 16384); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); >>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please let me know if you have encountered similar issues >>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken between >>>>>>>>>>>>>>> spout/bolt >>>>>>>>>>>>>>> and another bolt. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Kashyap >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Nikolaos Romanos Katsipoulakis, >>>>>>>> University of Pittsburgh, PhD candidate >>>>>>>> >>>>>>> >>>>>>> >>>>