Generally, binary search combined with observation of the system (whether
it meets throughput/latency targets) is a good approach.
On Jul 17, 2015 6:28 PM, "Kashyap Mhaisekar" <kashya...@gmail.com> wrote:

> Nathan,
> The bolts are extending BaseBasicBolt and also, the in the spout am
> explicitly emitting a msgId hence tuples should be tagged and anchored.
> What I see is this -
> 1. The logic exection in the bolt takes not more than 1 ms (start of
> execute() and end of execute())
> 2. The time is being spent *between* the bolts
> 3. The thread dumps all show LMAX disruptor at -
> com.lmax.disruptor.blockingwaitstrategy.*waitfor() *where the maximum CPU
> time is being spent.
>
> Is there a pattern with which the buffer sizes need to be tuned? :(
>
> Thanks
> Kashyap
>
> On Thu, Jul 16, 2015 at 6:29 PM, Andrew Xor <andreas.gramme...@gmail.com>
> wrote:
>
>> Thanks for the clarification regarding Task ID's Nathan, I was under that
>> false impression as the site docs are a bit misleading. Thanks for pointing
>> that out!
>>
>> Regards.
>>
>> Kindly yours,
>>
>> Andrew Grammenos
>>
>> -- PGP PKey --
>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>
>> On Fri, Jul 17, 2015 at 2:12 AM, Nathan Leung <ncle...@gmail.com> wrote:
>>
>>> If your tuples are reliable (spout emit with message id) and anchored
>>> (emit from bolt anchored to input tuple), then you have to answer that
>>> yourself. If not, then your output queue size is not constrained by the
>>> framework and you may still have high latency.
>>> On Jul 16, 2015 7:05 PM, "Kashyap Mhaisekar" <kashya...@gmail.com>
>>> wrote:
>>>
>>>> Nathan,
>>>> My max spout pending is set to 1. Now is my problem with latency or
>>>> with throughput.
>>>>
>>>> Thank you!
>>>> Kashyap
>>>> On Jul 16, 2015 5:46 PM, "Nathan Leung" <ncle...@gmail.com> wrote:
>>>>
>>>>> If your tuples are anchored max spout pending indirectly affects how
>>>>> many tuples are generated ;).
>>>>> On Jul 16, 2015 6:18 PM, "Kashyap Mhaisekar" <kashya...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Nathan. One question though - Are there any best practices
>>>>>> when tuples are getting generated in the topology and not really
>>>>>> controllable via Max Spout Pending?
>>>>>>
>>>>>> Thanks
>>>>>> Kashyap
>>>>>>
>>>>>> On Thu, Jul 16, 2015 at 5:07 PM, Nathan Leung <ncle...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Also I would argue that this is not important unless your
>>>>>>> application is especially latency sensitive or your queue is so long 
>>>>>>> that
>>>>>>> it is causing in flight tuples to timeout.
>>>>>>> On Jul 16, 2015 6:05 PM, "Nathan Leung" <ncle...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sorry for a brief response.. The number of tuples in flight
>>>>>>>> absolutely affects your max latency.  You need to tune your topology 
>>>>>>>> max
>>>>>>>> spout pending.  Lower value will reduce your end to end latency, but if
>>>>>>>> it's too low it may affect throughput.  I've posted to the group about 
>>>>>>>> this
>>>>>>>> before; if you do a search you may find some posts where I've discussed
>>>>>>>> this in more detail.
>>>>>>>> On Jul 16, 2015 5:56 PM, "Kashyap Mhaisekar" <kashya...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nathan,
>>>>>>>>> Thanks. Have been running on a bare bones topology as suggested. I
>>>>>>>>> am inclined to believe that the no. of messages in the topology at 
>>>>>>>>> that
>>>>>>>>> point in time is affecting the "latency".
>>>>>>>>>
>>>>>>>>> Am trying to now figure out how should the topology be structured
>>>>>>>>> when the no. of transient tupples in the topology is very high.
>>>>>>>>>
>>>>>>>>> Topology is structured as follows -
>>>>>>>>> Consumer (A java program sends a message to storm cluster) ->  A
>>>>>>>>> (Spout) ->(Emits a number say 100) -> B (bolt) [Emits 100 messages]) 
>>>>>>>>> -> C
>>>>>>>>> (bolt) [Passes along the message to next bolt) -> D (bolt) [Passes 
>>>>>>>>> along
>>>>>>>>> the message to next bolt] -> E (bolt) [Aggregates all the data and 
>>>>>>>>> confirms
>>>>>>>>> if all the 100 messages are processed)
>>>>>>>>>
>>>>>>>>> What I observed is as follows -
>>>>>>>>> 1. The time taken for an end to end processing of the message
>>>>>>>>> (Sending the message to Storm cluster and till the time the 
>>>>>>>>> aggregation is
>>>>>>>>> complete) is directly dependent on the volume of messages that is 
>>>>>>>>> entering
>>>>>>>>> into storm and also the no. of emits done by the spout A.
>>>>>>>>> *Test 1: 100 sequential messages to storm with B emitting 100
>>>>>>>>> tuples per message (100X100=10000) there are 10000 tuples emitted and 
>>>>>>>>> the
>>>>>>>>> time taken to aggregate the 100 is 15 ms to 100 ms*
>>>>>>>>> *Test 2: 100 sequential messages to storm with B emitting 1000
>>>>>>>>> tuples per message (100X1000=100000) **there are 100000 tuples
>>>>>>>>> emitted and the time taken to aggregate the 100 is 4 seconds to 10 
>>>>>>>>> seconds*
>>>>>>>>> 2.Strange thing is - the more parallelism i add, the times are so
>>>>>>>>> much more bad. Am trying to figure out if the memory per worker is a
>>>>>>>>> constraint, but this is the firs time am seeing this.
>>>>>>>>>
>>>>>>>>> Question - How should the use case be handled where in the no. of
>>>>>>>>> tuples in the topology could increase exponentially..,
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Kashyap
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 16, 2015 at 3:52 PM, Nick R. Katsipoulakis <
>>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you all for the valuable info.
>>>>>>>>>>
>>>>>>>>>> Unfortunately, I have to use it for my (research) prototype
>>>>>>>>>> therefore I have to go along with it.
>>>>>>>>>>
>>>>>>>>>> Thank you again,
>>>>>>>>>> Nick
>>>>>>>>>>
>>>>>>>>>> 2015-07-16 16:33 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Storm task ids don't change:
>>>>>>>>>>> https://groups.google.com/forum/#!topic/storm-user/7P23beQIL4c
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 16, 2015 at 4:28 PM, Andrew Xor <
>>>>>>>>>>> andreas.gramme...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Direct grouping as it is shown in storm docs, means that you
>>>>>>>>>>>> have to have a specific task id and use "direct streams" which is 
>>>>>>>>>>>> error
>>>>>>>>>>>> prone, probably increase latency and might introduce redundancy 
>>>>>>>>>>>> problems as
>>>>>>>>>>>> the producer of tuple needs to know the id of the task the tuple 
>>>>>>>>>>>> will have
>>>>>>>>>>>> to go; so imagine a scenario where the receiving task fails for 
>>>>>>>>>>>> some reason
>>>>>>>>>>>> and the producer can't relay the tuples unless it received the 
>>>>>>>>>>>> re-spawned
>>>>>>>>>>>> task's id.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope this helps.
>>>>>>>>>>>>
>>>>>>>>>>>> Kindly yours,
>>>>>>>>>>>>
>>>>>>>>>>>> Andrew Grammenos
>>>>>>>>>>>>
>>>>>>>>>>>> -- PGP PKey --
>>>>>>>>>>>> ​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
>>>>>>>>>>>> https://www.dropbox.com/s/ei2nqsen641daei/pgpsig.txt
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 16, 2015 at 11:24 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello again,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nathan, I am using direct-grouping because the application I
>>>>>>>>>>>>> am working on has to be able to send tuples directly to specific 
>>>>>>>>>>>>> tasks. In
>>>>>>>>>>>>> general control the data flow. Can you please explain to me why 
>>>>>>>>>>>>> you would
>>>>>>>>>>>>> not recommend direct grouping? Is there any particular reason in 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> architecture of Storm?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-07-16 16:20 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would not recommend direct grouping unless you have a good
>>>>>>>>>>>>>> reason for it.  Shuffle grouping is essentially random with even
>>>>>>>>>>>>>> distribution which makes it easier to characterize its 
>>>>>>>>>>>>>> performance.  Local
>>>>>>>>>>>>>> or shuffle grouping stays in process so generally it will be 
>>>>>>>>>>>>>> faster.
>>>>>>>>>>>>>> However you have to be careful in certain cases to avoid task 
>>>>>>>>>>>>>> starvation
>>>>>>>>>>>>>> (e.g. you have kafka spout with 1 partition on the topic and 1 
>>>>>>>>>>>>>> spout task,
>>>>>>>>>>>>>> feeding 10 bolt "A" tasks in 10 worker processes). Direct 
>>>>>>>>>>>>>> grouping depends
>>>>>>>>>>>>>> on your code (i.e. you can create hotspots), fields grouping 
>>>>>>>>>>>>>> depends on
>>>>>>>>>>>>>> your key distribution, etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jul 16, 2015 at 3:50 PM, Nick R. Katsipoulakis <
>>>>>>>>>>>>>> nick.kat...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have two questions:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) How do you exactly measure latency? I am doing the same
>>>>>>>>>>>>>>> thing and I have a problem getting the exact milliseconds of 
>>>>>>>>>>>>>>> latency
>>>>>>>>>>>>>>> (mainly because of clock drifting).
>>>>>>>>>>>>>>> 2) (to Nathan) Is there a difference in speeds among
>>>>>>>>>>>>>>> different groupings? For instance, is shuffle faster than 
>>>>>>>>>>>>>>> direct grouping?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-07-15 17:37 GMT-04:00 Nathan Leung <ncle...@gmail.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Two things. Your math may be off depending on parallelism.
>>>>>>>>>>>>>>>> One emit from A becomes 100 emitted from C, and you are 
>>>>>>>>>>>>>>>> joining all of them.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Second, try the default number of ackers (one per worker).
>>>>>>>>>>>>>>>> All your ack traffic is going to a single task.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Also you can try local or shuffle grouping if possible to
>>>>>>>>>>>>>>>> reduce network transfers.
>>>>>>>>>>>>>>>> On Jul 15, 2015 12:45 PM, "Kashyap Mhaisekar" <
>>>>>>>>>>>>>>>> kashya...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> We are attempting a real-time distributed computing using
>>>>>>>>>>>>>>>>> storm and the solution has only one problem - inter bolt
>>>>>>>>>>>>>>>>> latency on same machine or across machines ranges between
>>>>>>>>>>>>>>>>> 2 - 250 ms. I am not able to figure out why. Network
>>>>>>>>>>>>>>>>> latency is under 0.5 ms. By latency, I mean the time
>>>>>>>>>>>>>>>>> between an emit of one bolt/spout to getting the message in 
>>>>>>>>>>>>>>>>> execute() of
>>>>>>>>>>>>>>>>> next bolt.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a topology like the below -
>>>>>>>>>>>>>>>>> A (Spout) ->(Emits a number say 1000) -> B (bolt)
>>>>>>>>>>>>>>>>> [Receives this number and divides this into 10 emits of 100 
>>>>>>>>>>>>>>>>> each) -> C
>>>>>>>>>>>>>>>>> (bolt) [Recieves these emits and divides this to 10 emits of 
>>>>>>>>>>>>>>>>> 10 numbers) ->
>>>>>>>>>>>>>>>>> D (bolt) [Does some computation on the number and emits one 
>>>>>>>>>>>>>>>>> message] -> E
>>>>>>>>>>>>>>>>> (bolt) [Aggregates all the data and confirms if all the 1000 
>>>>>>>>>>>>>>>>> messages are
>>>>>>>>>>>>>>>>> processed)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Every bolt takes under 3 msec to complete and as a result,
>>>>>>>>>>>>>>>>> I estimated that the end to end processing for 1000 takes not 
>>>>>>>>>>>>>>>>> more than 50
>>>>>>>>>>>>>>>>> msec including any latencies.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Observations*
>>>>>>>>>>>>>>>>> 1. The end to end time from Spout A to Bolt E takes 200
>>>>>>>>>>>>>>>>> msec to 3 seconds. My estimate was under 50 msec given that 
>>>>>>>>>>>>>>>>> each bolt and
>>>>>>>>>>>>>>>>> spout take under 3 msec to execute including any latencies.
>>>>>>>>>>>>>>>>> 2. I noticed that the most of the time is spent between
>>>>>>>>>>>>>>>>> Emit from a Spout/Bolt and execute() of the consuming bolt.
>>>>>>>>>>>>>>>>> 3. Network latency is under 0.5 msec.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not able to figure out why it takes so much time
>>>>>>>>>>>>>>>>> between a spout/bolt to next bolt. I understand that the 
>>>>>>>>>>>>>>>>> spout/bolt buffers
>>>>>>>>>>>>>>>>> the data into a queue and then the subsequent bolt consumes 
>>>>>>>>>>>>>>>>> from there.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Infrastructure*
>>>>>>>>>>>>>>>>> 1. 5 VMs with 4 CPU and 8 GB ram. Workers are with 1024 MB
>>>>>>>>>>>>>>>>> and there are 20 workers overall.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Test*
>>>>>>>>>>>>>>>>> 1. The test was done with 25 messages to the spout => 25
>>>>>>>>>>>>>>>>> messages are sent to spout in a span of 5 seconds.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *Config values*
>>>>>>>>>>>>>>>>> Config config = new Config();
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_WORKERS, Integer.parseInt(20));
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,
>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,
>>>>>>>>>>>>>>>>> 16384);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
>>>>>>>>>>>>>>>>> config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 64);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please let me know if you have encountered similar issues
>>>>>>>>>>>>>>>>> and any steps you have taken to mitigate the time taken 
>>>>>>>>>>>>>>>>> between spout/bolt
>>>>>>>>>>>>>>>>> and another bolt.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Kashyap
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Nikolaos Romanos Katsipoulakis,
>>>>>>>>>> University of Pittsburgh, PhD candidate
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>
>

Reply via email to