1) It is true that nextTuple implementations should try to avoid blocking
as much as possible, but the time nextTuple may block in poll is capped by
a configuration parameter
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L304.


The guideline on the concepts page is (I think) intended to convey that
implementations should avoid long or indefinite duration blocking in
nextTuple. The concern here is that Storm can't call ack or fail or any
other spout method while the spout thread is executing nextTuple. This can
break tuple timeouts, because an acked tuple sitting in the spout's queue
can still end up timing out if the spout thread is blocked in nextTuple. It
is also preferable if we can avoid the spout's input queue getting full. As
long as the call to nextTuple returns within a reasonable timeframe,
blocking won't cause these problems.

For example, it would be a bad idea to implement a spout that blocks in
nextTuple until more messages are available, because it would mean that
acked tuples can end up sitting there waiting for nextTuple to return
before they are properly acked, which could cause them to time out
erroneously. If the spout instead only blocks for a very short period, this
problem is avoided.

Very short duration blocking is also what Storm itself will do by default
if nextTuple returns no new tuples. This avoids excessive calls to
nextTuple wasting a bunch of power and heat if there are no new tuples to
emit
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java#L36.


If you're interested in the details of how this works, take a look at
https://github.com/apache/storm/blob/d7c781891fd0cf409335ed1b70397e3c6747475e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L139
to see the loop Storm runs for spout components.

The only potentially long block we have in nextTuple is the commit
operation, which seems reasonably innocent to me. I'm not aware of any
reason that call would block for a long time, unless there's a problem with
the connection to Kafka. I'd be happy to be corrected, but I can't think of
any situation where commit blocks for a long time, but we would benefit
from not blocking the spout thread.

Regarding decoupling consumption and processing, this is already the case.
Consumption is the spout's responsibility, while processing is handled by
the rest of the topology (the bolts). The spout polls Kafka and puts the
messages in the shared topology queue by emitting them. The bolts then
process the messages. Since the bolts run in different threads than the
spouts, consumption and processing is already decoupled.

About how prefetching works with seeking, I haven't looked into how
prefetching works when we seek to a new offset, but I'd just like to
correct you a bit: We don't seek before every poll. We only seek if there
are failed tuples ready for retry, and then only on the partitions
containing those tuples. Most calls to poll will happen with no preceding
seek.

2017-07-10 19:13 GMT+02:00 chandan singh <cks07...@gmail.com>:

> Hi Stig & Bobby
>
> Thanks for confirming my understanding.
>
> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
> has been a guideline on http://storm.apache.org/
> releases/1.1.0/Concepts.html
> for long. Copying verbatim here : "The main method on spouts is nextTuple.
> nextTuple either emits a new tuple into the topology or simply returns if
> there are no new tuples to emit. It is imperative that nextTuple does not
> block for any spout implementation, because Storm calls all the spout
> methods on the same thread." I admit that there is some chance my
> interpretation is partially incorrect but I have been following it in a
> custom spout till now. Even though the objective is different, there is a
> similar hint on Kafka official documentation. Please see under heading "2.
> Decouple Consumption and Processing" on
> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html.
> Essentially, a thread polls Kafka and spout thread gets the messages
> through a shared queue. If pre-fetching is present in Kafka (I will read
> about it further), I assume we do not have to fetch in another thread but I
> am not sure how does the pre-fetching behave with re-seeking before every
> poll.
>
> 2) @Bobby, you are correct in pointing what needs to be optimized but the
> facts, sometimes, prevent us from making assumptions. We do optimize our
> retry loop such that we don't poll the messages again. I especially see
> problems when combined with exponential back off.  I am not sure how
> difficult or clean will it be to expose some sort of configuration to allow
> such optimization?  Do you think it will be worth trying out something?
>
> Thanks
> Chandan
>

Reply via email to