@chandan 

#1 was designed to be single threaded. There are several reasons for that. 
  1.1 KafkaConsumer is single threaded. If you have multiple threads attempting 
to poll from it, you will get a ConcurrentModificationException("KafkaConsumer 
is not safe for multi-threaded access”); 
  1.2 Multiple threads are typically only advantageous when one of the threads 
is expected to have idle time, which typically comes on the form of I/O 
operations. That is not the case for the Spout. Has @Stig mentioned, Storm 
already decouples consumption and processing, therefore a Kafka ConsumerRecord 
is fetched by the spout and in the call to nextTuple is sent downstream. A 
Spout is Storm’s abstraction to ingest date in the topology. The Bolt is the 
storm abstraction to let the user plug in arbitrary computation. 

#2 was also designed like that 
  2.1 @Bobby mentioned #2 was designed to optimize for the case where there are 
no failures. Failures are expected to be rare. If there are lots of failures, 
something is wrong, e.g. problems in a downstream component/service, or 
networking problems. When optimizing one weighs tradeoffs. The implementation 
favors the case when there no failures, which is the most common case. Or put 
another way, you want to maximize throughput during normal message processing, 
not when you anticipate that there may be some errors/failures
  2.2 The implementation was designed not to keep any records in memory. A 
KafkaRecord can have arbitrary size, and in the case multiple records keep on 
failing, the memory footprint can be increasingly large. Furthermore, if you 
keep the records in memory, and the JVM running the spout crashes, you will 
have to poll from Kafka. The Spout already does some optimizations to not emit 
records that are subsequent to a failed record, but that didn’t fail. Once 
again we are trying to strike a good balance between throughput, guarantee of 
delivery, and minimizing duplicates. 

Best,
Hugo

> On Jul 10, 2017, at 11:40 AM, Stig Rohde Døssing <[email protected]> 
> wrote:
> 
> 1) It is true that nextTuple implementations should try to avoid blocking
> as much as possible, but the time nextTuple may block in poll is capped by
> a configuration parameter
> https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L304.
> 
> 
> The guideline on the concepts page is (I think) intended to convey that
> implementations should avoid long or indefinite duration blocking in
> nextTuple. The concern here is that Storm can't call ack or fail or any
> other spout method while the spout thread is executing nextTuple. This can
> break tuple timeouts, because an acked tuple sitting in the spout's queue
> can still end up timing out if the spout thread is blocked in nextTuple. It
> is also preferable if we can avoid the spout's input queue getting full. As
> long as the call to nextTuple returns within a reasonable timeframe,
> blocking won't cause these problems.
> 
> For example, it would be a bad idea to implement a spout that blocks in
> nextTuple until more messages are available, because it would mean that
> acked tuples can end up sitting there waiting for nextTuple to return
> before they are properly acked, which could cause them to time out
> erroneously. If the spout instead only blocks for a very short period, this
> problem is avoided.
> 
> Very short duration blocking is also what Storm itself will do by default
> if nextTuple returns no new tuples. This avoids excessive calls to
> nextTuple wasting a bunch of power and heat if there are no new tuples to
> emit
> https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/spout/SleepSpoutWaitStrategy.java#L36.
> 
> 
> If you're interested in the details of how this works, take a look at
> https://github.com/apache/storm/blob/d7c781891fd0cf409335ed1b70397e3c6747475e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java#L139
> to see the loop Storm runs for spout components.
> 
> The only potentially long block we have in nextTuple is the commit
> operation, which seems reasonably innocent to me. I'm not aware of any
> reason that call would block for a long time, unless there's a problem with
> the connection to Kafka. I'd be happy to be corrected, but I can't think of
> any situation where commit blocks for a long time, but we would benefit
> from not blocking the spout thread.
> 
> Regarding decoupling consumption and processing, this is already the case.
> Consumption is the spout's responsibility, while processing is handled by
> the rest of the topology (the bolts). The spout polls Kafka and puts the
> messages in the shared topology queue by emitting them. The bolts then
> process the messages. Since the bolts run in different threads than the
> spouts, consumption and processing is already decoupled.
> 
> About how prefetching works with seeking, I haven't looked into how
> prefetching works when we seek to a new offset, but I'd just like to
> correct you a bit: We don't seek before every poll. We only seek if there
> are failed tuples ready for retry, and then only on the partitions
> containing those tuples. Most calls to poll will happen with no preceding
> seek.
> 
> 2017-07-10 19:13 GMT+02:00 chandan singh <[email protected]>:
> 
>> Hi Stig & Bobby
>> 
>> Thanks for confirming my understanding.
>> 
>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are non-blocking
>> has been a guideline on http://storm.apache.org/
>> releases/1.1.0/Concepts.html
>> for long. Copying verbatim here : "The main method on spouts is nextTuple.
>> nextTuple either emits a new tuple into the topology or simply returns if
>> there are no new tuples to emit. It is imperative that nextTuple does not
>> block for any spout implementation, because Storm calls all the spout
>> methods on the same thread." I admit that there is some chance my
>> interpretation is partially incorrect but I have been following it in a
>> custom spout till now. Even though the objective is different, there is a
>> similar hint on Kafka official documentation. Please see under heading "2.
>> Decouple Consumption and Processing" on
>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
>> kafka/clients/consumer/KafkaConsumer.html.
>> Essentially, a thread polls Kafka and spout thread gets the messages
>> through a shared queue. If pre-fetching is present in Kafka (I will read
>> about it further), I assume we do not have to fetch in another thread but I
>> am not sure how does the pre-fetching behave with re-seeking before every
>> poll.
>> 
>> 2) @Bobby, you are correct in pointing what needs to be optimized but the
>> facts, sometimes, prevent us from making assumptions. We do optimize our
>> retry loop such that we don't poll the messages again. I especially see
>> problems when combined with exponential back off.  I am not sure how
>> difficult or clean will it be to expose some sort of configuration to allow
>> such optimization?  Do you think it will be worth trying out something?
>> 
>> Thanks
>> Chandan
>> 

Reply via email to