Your ack sequence is right, but the timeout is not just the simple sum of
these times.  Consider a simple topology similar to yours:

spout s1 -> bolt b1, with 1 s1 task and 1 b1 task

s1 emits very quickly (quickly enough that the time taken doesn't matter
for the example)
b1 takes 100 ms to process

If max spout pending is 1000, then when the spout does an emit, it will
have up to 1000 messages that it must wait for.  If we emit 1000 message
right away, then the spout task will reach max spout pending, the 1000th
message will have to wait for all 999 messages in front of it to be
processed before it gets executed by b1.  Therefore its complete latency
will be nearly 100s (999 messages * 100ms / message in b1).  If I keep
emitting at the spout, then all subsequent messages will also have nearly
100s complete latency.  You can see in this example that if you lower the
max spout pending to 100, then message 100 would have expected complete
latency of 10s, and all subsequent messages would be the same.

You do have to be careful because if topology.max.spout.pending is too low
you can cause your topology to be starved for data.  I think the general
recommendation is to set it to some initial value like 1024 and keep
increasing it until you stop gaining throughput.  For your topology where
the bolts seem to have high processing times you could probably start with
a lower value than 1024.


On Wed, Jul 30, 2014 at 1:34 PM, Wei, Xin <xin_...@intuit.com> wrote:

>  Hi Nathan,
>
>  Thanks a lot for your explanation.  There are still something I’m not
> very clear:
>  1.  "The topology.max.spout.pending configures how many messages can be
> un-acked from each spout before it stops sending messages.” - how a message
> in the spout would be got acked?
>  In my case, the topology is like this:
> ActiveMQ — > JMS_SPOUt  — >  MESSAGE-FILTER_BOLT —> AGGREGATOR_BOLT — >
> OFFER*GENERATOR_BOLT *
>
>  There is no cycle there.   According to my understanding , A message in
> the spout got acked  only when :
> 1). jms_spout get message from activeMQ and emit it o message_filter —
> assume spending time t1
> 2). MessageFilter_bolt receive it and ack it then emit to aggregator —
> assume time t2
> 3). Aggregator receive it and acked it   — assume time t3
> 4).  Offergenerator bolt receive it and ack it — assume time t4
> 5). Finally the jms_spout would ack this message — assume time t5
> Only :
> t1+t2+t3+t4+t5 <=   message timeout secs,
>
>  then this message could be counted as successful.  Is my understanding
> correct or not ?
>
>  2. "he complete latency seems kind of high, but maybe setting max spout
> pending to a lower value would help reduce it.”
>
>  Why set the max spout pending to lower value would reduce complete
> latency?  I’m thinking increase the value would reduce the latency.
> In our code, I batch read 5000 messages one time, so 2 spout would read
> 10000 messages one time. Assume I set the max spout pending to 1000, if
> the unacked messages is 2000, then the spout would stop sending messages,
> the left 8000 messages would be just idle there and wait for sending. But
> if the max spout is 10000, then it will still keep sending. Will not 10000
> max pending be more efficient?
>
>  Or my understanding is wrong, I should think it like this way:   since
> already 2000 messages got un – acked, but you still keep sending messages,
> giving a analogy, the traffic is already jammed, but you still keep the
> meter off and let all cars flow into the crowded highway. The better way is
> to turn on the meter, let the car wait a little bit and enter the highway
> one by one.
>
>  A little bit too long, thanks a lot for your patience to read it.
>
>   From: Nathan Leung <ncle...@gmail.com>
> Reply-To: "user@storm.incubator.apache.org" <
> user@storm.incubator.apache.org>
> Date: Wednesday, July 30, 2014 at 9:44 AM
> To: user <user@storm.incubator.apache.org>
> Subject: Re: why complete latency and failure rate is so high of my spout.
>
>   The topology.max.spout.pending configures how many messages can be
> un-acked from each spout before it stops sending messages.  So in your
> example, each spout task can have 10 thousand messages waiting to be acked
> before it throttle itself and stops emitting.  Of course if some of those
> messages are acked, then it will be able to emit more messages.  This is
> important because you do not want to have too much data pending.  If you
> have a lot of data pending, then it will increase the amount of time that
> it takes to process the message because the complete latency is counted
> starting when you emit the tuple even if it's just waiting in the spouts
> output queue.  If the message times out without getting acked at the spout,
> then it will get counted as a failure, which is why you were seeing so many
> failures.  Changing the timeout to 90s probably also played a big role in
> reducing your failure count.
>
>  The complete latency seems kind of high, but maybe setting max spout
> pending to a lower value would help reduce it.
>
>
> On Wed, Jul 30, 2014 at 12:38 PM, Wei, Xin <xin_...@intuit.com> wrote:
>
>>  Hi There,
>>
>>  Yesterday, I changed some configurations of storm settings,  right now
>> , the spout failure rate dropped to 0.  As shown below:
>>
>>   Topology stats   Window Emitted Transferred Complete latency (ms) Acked
>> Failed   10m 0s
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=600>
>> 8766 8766 43077.391 5290 0  3h 0m 0s
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=10800>
>> 8766 8766 43077.391 5290 0  1d 0h 0m 0s
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=86400>
>> 8766 8766 43077.391 5290 0  All time
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061?window=%3Aall-time>
>> 8766 8766 43077.391 5290 0   Spouts (All time)   Id Executors Tasks
>> Emitted Transferred Complete latency (ms) Acked Failed Last error
>> JMS_QUEUE_SPOUT
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/JMS_QUEUE_SPOUT>
>> 2 2 5290 5290 43077.391 5290 0    Bolts (All time)   Id Executors Tasks
>> Emitted Transferred Capacity (last 10m) Execute latency (ms) Executed Process
>> latency (ms) Acked Failed Last error   AGGREGATOR_BOLT
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/AGGREGATOR_BOLT>
>> 8 8 1738 1738 0.080 83.264 1738 81.243 1738 0   MESSAGEFILTER_BOLT
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/MESSAGEFILTER_BOLT>
>> 8 8 1738 1738 0.091 29.833 5290 24.918 5290 0   OFFER_GENERATOR_BOLT
>> <http://pppdc9prd470.corp.intuit.net:8080/topology/nearline-3-1406737061/component/OFFER_GENERATOR_BOLT>
>> 8 8 0 0 0.031 25.993 1738 24.296 1738 0
>>  The topology configuration is listed below:
>>
>>   Topology Configuration   Key Value   dev.zookeeper.path
>> /tmp/dev-storm-zookeeper  drpc.childopts -Xmx768m  drpc.invocations.port
>> 3773  drpc.port 3772  drpc.queue.size 128  drpc.request.timeout.secs 600
>> drpc.worker.threads 64  java.library.path /usr/local/lib
>> logviewer.appender.name A1  logviewer.childopts -Xmx128m  logviewer.port
>> 8000  nimbus.childopts -Xmx1024m -Djava.net.preferIPv4Stack=true
>> nimbus.cleanup.inbox.freq.secs 600  nimbus.file.copy.expiration.secs 600
>> nimbus.host zookeeper  nimbus.inbox.jar.expiration.secs 3600
>> nimbus.monitor.freq.secs 10  nimbus.reassign true
>> nimbus.supervisor.timeout.secs 60  nimbus.task.launch.secs 120
>> nimbus.task.timeout.secs 30  nimbus.thrift.port 6627
>> nimbus.topology.validator backtype.storm.nimbus.DefaultTopologyValidator
>> storm.cluster.mode distributed  storm.id nearline-3-1406737061
>> storm.local.dir /app_local/storm  storm.local.mode.zmq false
>> storm.messaging.netty.buffer_size 5242880
>> storm.messaging.netty.client_worker_threads 1
>> storm.messaging.netty.max_retries 30  storm.messaging.netty.max_wait_ms
>> 1000  storm.messaging.netty.min_wait_ms 100
>> storm.messaging.netty.server_worker_threads 1  storm.messaging.transport
>> backtype.storm.messaging.zmq  storm.thrift.transport
>> backtype.storm.security.auth.SimpleTransportPlugin
>> storm.zookeeper.connection.timeout 15000  storm.zookeeper.port 2181
>> storm.zookeeper.retry.interval 1000
>> storm.zookeeper.retry.intervalceiling.millis 30000
>> storm.zookeeper.retry.times 5  storm.zookeeper.root /storm
>> storm.zookeeper.servers ["zookeeper"]  storm.zookeeper.session.timeout
>> 20000  supervisor.childopts -Xmx256m -Djava.net.preferIPv4Stack=true
>> supervisor.enable true  supervisor.heartbeat.frequency.secs 5
>> supervisor.monitor.frequency.secs 3  supervisor.slots.ports [6700 6701
>> 6702 6703]  supervisor.worker.start.timeout.secs 120
>> supervisor.worker.timeout.secs 30  task.heartbeat.frequency.secs 3
>> task.refresh.poll.secs 10  topology.acker.executors 4
>> topology.builtin.metrics.bucket.size.secs 60  topology.debug false
>> topology.disruptor.wait.strategy com.lmax.disruptor.BlockingWaitStrategy
>> topology.enable.message.timeouts true
>> topology.error.throttle.interval.secs 10
>> topology.executor.receive.buffer.size 16384
>> topology.executor.send.buffer.size 16384
>> topology.fall.back.on.java.serialization true  topology.kryo.decorators
>> []  topology.kryo.factory backtype.storm.serialization.DefaultKryoFactory
>> topology.kryo.register   topology.max.error.report.per.interval 5
>> topology.max.spout.pending 10000  topology.max.task.parallelism
>> topology.message.timeout.secs 90  topology.name nearline
>> topology.optimize true  topology.receiver.buffer.size 8
>> topology.skip.missing.kryo.registrations false
>> topology.sleep.spout.wait.strategy.time.ms 1
>> topology.spout.wait.strategy backtype.storm.spout.SleepSpoutWaitStrategy
>> topology.state.synchronization.timeout.secs 60
>> topology.stats.sample.rate 1  topology.tasks
>> topology.tick.tuple.freq.secs   topology.transfer.buffer.size 32
>> topology.trident.batch.emit.interval.millis 500
>> topology.tuple.serializer
>> backtype.storm.serialization.types.ListDelegateSerializer
>> topology.worker.childopts   topology.worker.shared.thread.pool.size 4
>> topology.workers 4  transactional.zookeeper.port
>> transactional.zookeeper.root /transactional
>> transactional.zookeeper.servers   ui.childopts -Xmx768m  ui.port 8080
>> worker.childopts -Xmx768m -Djava.net.preferIPv4Stack=false
>> -DNEARLINE_DATA_ENV=dev -DNEARLINE_APP_ENV=dev -DNEARLINE_QUEUES_ENV=dev
>> -Dauthfilter.appcred.default.encrypt.file=/home/xwei/FP_AppCred_Encrypt.txt
>> -Dauthfilter.appcred.default.passphrase.file=/home/xwei/FP_AppCred_Passphrase.txt
>> worker.heartbeat.frequency.secs 1  zmq.hwm 0  zmq.linger.millis 5000
>> zmq.threads 1
>> The settiings I changed:
>> 1.  topology.acker.executors    I adjust it to 4.
>> 2. Topology.max.spout.pending    change it to 10000
>> 3. topology.message.timeout.secs   change it from 30 to 90 secs
>>
>>  I think the NO 2 topology.max.spout.pending is the critical factor
>> which make big differences. Can anybody tell me what that setting does?
>>
>>
>>  Thanks a lot for help.
>>
>>
>>
>>
>

Reply via email to