You can simplify that to setBolt("id", new CustomBolt(...), 4) as by
default 1 executor is used for executing a task and there will be 4 tasks
created by setting the parallelism hint.If the throughput is 60% of the input rate, then I think the other 40% is waiting to get processed. They will be in the disruptor queue and if that is full, they will wait in the tcp buffers and after that is full the spout will not be able to emit it. In other words, the latency that you are computing includes the waiting time, the serialization & deserialization time and the network time so you are actually measuring the complete latency of the spout and this also depends on your max spout pending (and Storm UI also gives this metric for any Spout) and so we cannot say whether 3 sec is real or not because you are not measuring just the latency of the Bolt code (as I mentioned above that you are inherently measuring multiple things which includes waiting time, IPC and network transfer time) Regarding capacity if you want to measure it yourself, then you have to measure the number of messages you are processing per sec and also just the process latency of the Bolt in ms and you can figure out the capacity of the Bolt. On Fri, Sep 4, 2015 at 6:03 AM, Nick R. Katsipoulakis <[email protected] > wrote: > Hello Kishore, > > Please see my answers below. > > Thanks, > Nick > > 2015-09-03 18:59 GMT-04:00 Kishore Senji <[email protected]>: > >> When you say "4 executors per bolt", are you referring to the >> parallelism_hint parameter that you set in the setBolt() method call? If >> so, what that does is it creates 4 instances of the same Bolt and runs them >> in parallel via 4 executors (threads), so it still is 1 bolt instance / >> thread but a total of 4 instances running in parallel. >> > > I am referring to 4 executors per bolt and 1 task for each executor to > execute the code of the bolt. So, it is not only the parallelism hint in > the setBolt(), but also the .setNumTasks(). For instance, if I want a > certain bolt to be executed by 4 threads (executors), then I do: > > setBolt(new CustomBolt(...), X).setNumTasks(X) , where X is 4 in this case. > > >> For this reason, the latency of a given bolt does not change whether we >> are running 1 instance of 4 instances in parallel, as the same logic gets >> executed. However the throughput gets increased because now there are 4 >> instances running in parallel. >> >> > You are right, I am seeing throughput increasing, but also latency > increasing. > > >> >> I think the way that you are measuring the latency is not accurate and is >> not a good way to measure because the tick tuples get in to the same queue >> (as the rest of the messages) for the Bolt to process and if there is a >> burst of messages in a sec, the tick tuple will be behind that and it will >> get delayed and so there is no guarantee that they execute in the Bolt in >> the perfect time intervals as specified in the config and on the same >> reason your Bolt could be processing more or less depending on the rate and >> so the latency could be more less (because the latency is not measured for >> a fixed set of messages in your code) >> > > Actually, that is why I use the tick-tuples method. I want to examine what > is the trade-off of adding more (remote) executors to a topology. I want to > measure the network latency in my code and using tick tuples can reveal the > amount of time a message spends in an input message queue. > > >> But assuming it is all fixed rate of messages, why does the latency per >> thread increase as you increase the number of threads - My guess is that >> with 1 Bolt instance, the Spout and the Bolt could have been in the same >> worker process and with 4 instances, they could have been distributed >> (assuming you have more workers) and so the network time could be added >> here as well (as in your code you really are measuring the extra time >> between two ticks, which will include the network time of the messages and >> the processing time of the Bolt) >> > > This is exactly what I believe is happening. However, if you take a look > at the graphs I attached to my original message, the numbers do not make > much sense: I expect to see more latency in the 4 executors/bolt case, but > I do not expect to see 2X the latency of 1 executor/thread. In addition, I > do not think that 3 sec latency is real when my throughput is close to 60% > the input rate (input rate is 64K tuples per second). > >> >> Why don't you just refer to the Storm UI metrics. The processing latency >> and the capacity of the bolt can be obtained in Storm UI with no extra >> code. >> >> I wanted to have also the network cost when distributing a computation. > Hence, the latencies on the UI are not suitable for me. On the other hand, > capacity is a useful metric. Do you know how I can get the capacity from > the Bolt's code? > > >> >> >> On Thu, Sep 3, 2015 at 1:51 PM, Nick R. Katsipoulakis < >> [email protected]> wrote: >> >>> Hello all, >>> >>> I have been working on a project for some time now, and I want to share >>> a phenomenon I come across on my experiments, which I can not explain. >>> >>> My project deals with scalability on a specific stream processing >>> algorithm and I want to see how scalable my approach is when I add more >>> executors per bolt to my topology (more threads essentially). I measure >>> scalability through latency on each operator (bolt) in my topology. To >>> measure latency, I try to estimate it through the tick-tuple functionality >>> that Storm provides. My bolts are configured to receive tick tuples every >>> TICK_TUPLE_FREQ_SEC seconds, by using the following function: >>> >>> @Override >>> public Map<String, Object> getComponentConfiguration() { >>> Config conf = new Config(); >>> conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, >>> SynefoJoinBolt.TICK_TUPLE_FREQ_SEC); >>> return conf; >>> } >>> >>> >>> So, every TICK_TUPLE_FREQ_SEC seconds, each bolt is receiving a tick >>> tuple. I recognize tick tuples in my execute() function using the following >>> method: >>> >>> private boolean isTickTuple(Tuple tuple) { >>> String sourceComponent = tuple.getSourceComponent(); >>> String sourceStreamId = tuple.getSourceStreamId(); >>> return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID) && >>> sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID); >>> } >>> >>> After I receive a tick-tuple, I calculate latency by subtracting the >>> timestamp of the previous time I received a tick tuple from the current >>> timestamp and also TICK_TUPLE_FREQ_SEC seconds. In other words, I do the >>> following in my execute() function: >>> >>> public void execute(Tuple tuple) { >>> Long currentTimestamp = System.currentTimeMillis(); >>> /** >>> * Check if it is a tick tuple >>> */ >>> if(isTickTuple(tuple) && warmFlag) { >>> /** >>> * Check latency >>> */ >>> if(latestTimestamp == -1) { >>> latestTimestamp = currentTimestamp; >>> }else { >>> Long timeDifference = currentTimestamp - latestTimestamp; >>> String latencySpecifics = latestTimestamp + "-" + >>> currentTimestamp + "-" + timeDifference; >>> latestTimestamp = currentTimestamp; >>> if(timeDifference >= (TICK_TUPLE_FREQ_SEC * 1000)) { >>> statistics.updateWindowLatency((timeDifference - >>> (TICK_TUPLE_FREQ_SEC * 1000))); >>> latencyMetric.setValue(( timeDifference - >>> (TICK_TUPLE_FREQ_SEC * 1000) )); >>> latencyDetailsMetric.setValue(latencySpecifics); >>> } >>> } >>> collector.ack(tuple); >>> return; >>> } >>> >>> //Rest of the code.... >>> >>> } >>> >>> The variable latestTimestamp is initialized as -1 the first time I >>> receive a tick tuple. In order to keep track of latency, I make use of >>> Storm's metrics framework. >>> >>> The puzzling part is that the more executors I add in my topology, the >>> more I see latency increasing. In fact, latency reaches levels that do not >>> make sense. My cluster consists of 3 AWS m4.xlarge nodes for ZooKeeper and >>> Nimbus, and 4 AWS m4.xlarge Supervisor nodes (each one with 4 worker >>> slots). My input load is approximately 64K tuples per second (averaging >>> close to 32 bytes each) and I am using direct-grouping between bolts and >>> spouts(I have to use it for my algorithm - I know shuffle grouping or local >>> grouping might be better, but not for my use-case). >>> >>> I have attached two PDFs illustrating the latency values I measure >>> through time. One is depicting the setup where I use 1 executor per bolt, >>> and the other 4 executors per bolt. Each marker-line shows the latency >>> reported by each task as time progresses. On the PDF with the one executor, >>> there is only one line because only one task is executed by a single >>> executor. On the PDF with the 4 executors you can see the latency reported >>> by each task being executed by a different executor. As you can see, the >>> latency measured is enormous and frankly it does not make sense. Also, it >>> is confusing the fact that using one executor per bolt has less latency >>> than using 4 executors per bolt. >>> >>> Any ideas what is going on? Am I doing something wrong in measuring >>> latency? Am I missing something? By the way, the throughput for each bolt >>> is close to 5K tuples per second. Therefore, those numbers in latency do >>> not really make sense. >>> >>> Thank you, >>> Nick >>> >>> P.S.: The phenomenon where less executors have less latency than more >>> executors happens also in smaller input rates. >>> >> >> > > > -- > Nikolaos Romanos Katsipoulakis, > University of Pittsburgh, PhD candidate >
