Nathan,
The bolts are extending BaseBasicBolt and also, the in the spout am
explicitly emitting a msgId hence tuples should be tagged and anchored.
What I see is this -
1. The logic exection in the bolt takes not more than 1 ms (start of
execute() and end of execute())
2. The time is being spent *between* the bolts
3. The thread dumps all show LMAX disruptor at -
com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU
time is being spent.

Is there a pattern with which the buffer sizes need to be tuned? :(

Thanks
Kashyap

On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.gramme...@gmail.com>
wrote:

> Thanks for the clarification regarding Task ID's Nathan, I was under that
> false impression as the site docs are a bit misleading. Thanks for pointing
> that out!
>
> Regards.
>
> Kindly yours,
>
> Andrew Grammenos
>
> -- PGP PKey --
> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>
> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <ncle...@gmail.com> wrote:
>
>> If your tuples are reliable (spout emit with message id) and anchored
>> (emit from bolt anchored to input tuple), then you have to answer that
>> yourself. If not, then your output queue size is not constrained by the
>> framework and you may still have high latency.
>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> wrote:
>>
>>> Nathan,
>>> My max spout pending is set to 1. Now is my problem with latency or with
>>> throughput.
>>>
>>> Thank you!
>>> Kashyap
>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <ncle...@gmail.com> wrote:
>>>
>>>> If your tuples are anchored max spout pending indirectly affects how
>>>> many tuples are generated ;).
>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <kashya...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Nathan. One question though - Are there any best practices when
>>>>> tuples are getting generated in the topology and not really controllable
>>>>> via Max Spout Pending?
>>>>>
>>>>> Thanks
>>>>> Kashyap
>>>>>
>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <ncle...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Also I would argue that this is not important unless your application
>>>>>> is especially latency sensitive or your queue is so long that it is 
>>>>>> causing
>>>>>> in flight tuples to timeout.
>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <ncle...@gmail.com> wrote:
>>>>>>
>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>> absolutely affects your max latency.  You need to tune your topology max
>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>> it's too low it may affect throughput.  I've posted to the group about 
>>>>>>> this
>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>> this in more detail.
>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <kashya...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nathan,
>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I
>>>>>>>> am inclined to believe that the no. of messages in the topology at that
>>>>>>>> point in time is affecting the "latency".
>>>>>>>>
>>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>>
>>>>>>>> Topology is structured as follows -
>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) 
>>>>>>>> -> C
>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes 
>>>>>>>> along
>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>>>>> confirms
>>>>>>>> if all the 100 messages are processed)
>>>>>>>>
>>>>>>>> What I observed is as follows -
>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>> (Sending the message to Storm cluster and till the time the 
>>>>>>>> aggregation is
>>>>>>>> complete) is directly dependent on the volume of messages that is 
>>>>>>>> entering
>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and 
>>>>>>>> the
>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 
>>>>>>>> seconds*
>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>
>>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Kashyap
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>
>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>
>>>>>>>>> Thank you again,
>>>>>>>>> Nick
>>>>>>>>>
>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>> andreas.gramme...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>> have to have a specific task id and use "direct streams" which is 
>>>>>>>>>>> error
>>>>>>>>>>> prone, probably increase latency and might introduce redundancy 
>>>>>>>>>>> problems as
>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple 
>>>>>>>>>>> will have
>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for 
>>>>>>>>>>> some reason
>>>>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>>>>> re-spawned
>>>>>>>>>>> task's id.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>
>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>
>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>
>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>
>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I am
>>>>>>>>>>>> working on has to be able to send tuples directly to specific 
>>>>>>>>>>>> tasks. In
>>>>>>>>>>>> general control the data flow. Can you please explain to me why 
>>>>>>>>>>>> you would
>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in 
>>>>>>>>>>>> the
>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Nick
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>> distribution which makes it easier to characterize its 
>>>>>>>>>>>>> performance.  Local
>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be 
>>>>>>>>>>>>> faster.
>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>>>>> starvation
>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 
>>>>>>>>>>>>> spout task,
>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct 
>>>>>>>>>>>>> grouping depends
>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping 
>>>>>>>>>>>>> depends on
>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of 
>>>>>>>>>>>>>> latency
>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than direct 
>>>>>>>>>>>>>> grouping?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are joining 
>>>>>>>>>>>>>>> all of them.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>> kashya...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>>> latency on same machine or across machines ranges between
>>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in 
>>>>>>>>>>>>>>>> execute() of
>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt) [Receives
>>>>>>>>>>>>>>>> this number and divides this into 10 emits of 100 each) -> C 
>>>>>>>>>>>>>>>> (bolt)
>>>>>>>>>>>>>>>> [Recieves these emits and divides this to 10 emits of 10 
>>>>>>>>>>>>>>>> numbers) -> D
>>>>>>>>>>>>>>>> (bolt) [Does some computation on the number and emits one 
>>>>>>>>>>>>>>>> message] -> E
>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 
>>>>>>>>>>>>>>>> messages are
>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result,
>>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not 
>>>>>>>>>>>>>>>> more than 50
>>>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that 
>>>>>>>>>>>>>>>> each bolt and
>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the 
>>>>>>>>>>>>>>>> spout/bolt buffers
>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes 
>>>>>>>>>>>>>>>> from there.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken 
>>>>>>>>>>>>>>>> between spout/bolt
>>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>
>

Reply via email to