Thanks @Stig for the detailed explanation. I have not yet used the
KafkaSpout; was just going through the code to understand it. I will try
the fixes too. In the topologies I run, I hardly create a scenario where
the Spout is limiting the throughput. Still, I will try to do some bench
marking by using a dummy topology and update you on my experience.

Thanks a lot again.
Chandan

On Tue, Jul 11, 2017 at 3:46 AM, Stig Døssing <generalbas....@gmail.com>
wrote:

> > It is the spout thread which
> > is polling (consuming) and iterating (processing) over the polled
> messages.
> > If we separate the consumption in another thread and push messages in a
> > queue, iterating (processing) is now concurrent and decoupled
>
> Sure, but the bookkeeping done by the spout before emitting the polled
> messages should be very lightweight, and pushing the messages (and
> returning acks/fails) through another queue system isn't free. I'm just not
> convinced that avoiding short duration blocks by running the consumer in a
> separate thread has any benefit, but I'd be happy to see benchmarks.
>
> The auto spout block/wait when no messages are emitted makes absolute sense
> > but a different scenario, less likely in a mostly loaded topology.
>
> Keep in mind that the consumer blocking on poll is also less likely in a
> loaded topology, because the block only happens if either Kafka has no more
> new messages, or the consumer prefetching fails to fetch messages quickly
> enough.
>
> 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> > calls seriously, taking into account its critical role
>
> I think it's a good recommendation, but like I said I understand it to be
> warning about blocking for long periods. Besides, the user can configure
> how long of a block in poll they will accept for the KafkaSpout, so if the
> current default timeout (200ms) is causing someone issues, it can always be
> lowered.
>
> I am especially thinking of the failure case
> > which, for some reason, is recurrent;  the same message which does not
> get
> > acked.
>
> This case is something the user needs to deal with regardless of how we
> implement the spout. If a message is repeatedly failing, the choice will be
> between repeatedly retrying that message until it finally succeeds, or
> dropping that message and moving on. Either option is something the user
> can configure via the RetryService.
>
> We cannot commit unless
> > that failed message is finally acked after few retries. Due to which, we
> > don't even process any new set of records.
> >
> This behavior is caused by a bug in 1.1.0, which has since been fixed. Once
> the failed message is fetched and retried, the spout should proceed past
> the failed message and emit new tuples. The only reasons the spout should
> stop emitting new tuples are if it hits the cap on uncommitted offsets
> (configurable, see
> https://github.com/apache/storm/blob/master/external/
> storm-kafka-client/src/main/java/org/apache/storm/kafka/
> spout/KafkaSpoutConfig.java#L47),
> or if the failed message is so far back in the message stream that the
> spout seeks back to the failed message, and then spends a long time
> fetching and discarding already emitted tuples while catching up to where
> it left off.
>
> There are bugs in the max uncommitted offsets mechanism right now that can
> cause a spout to stop retrying tuples (some fixed here
> https://issues.apache.org/jira/browse/STORM-2343, some still pending
> https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
> this). I think the behavior you're seeing is caused by part of STORM-2343,
> which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
> tuple, the spout would always seek to the committed offset for that
> partition in order to get the failed tuple. This caused some issues
> relating to being unable to retry messages that were too far from the
> committed offset, and unnecessarily polling the messages next to the
> committed offset when we didn't need to. We've changed it so we seek to the
> offset we want to retry instead, which avoids these issues. See
> https://github.com/apache/storm/pull/1924/files#diff-
> 7d7cbc8f5444fa7ada7962033fc31c5eR302
> for details. You may want to try checking out 1.x-branch and building the
> spout yourself to see if this solves your issue.
>
> 2017-07-10 23:08 GMT+02:00 chandan singh <cks07...@gmail.com>:
>
> > Hi Hugo
> >
> > Hope I do not come across as arguing for its own sake.
> >
> > 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
> > threads. We can use a single thread which is different from spout thread.
> > That thread and spout will share a queue of ConsumerRecords.
> >
> > 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> > calls seriously, taking into account its critical role. I still believe
> it
> > is a sound recommendation but may be there are other factors like
> topology
> > throughput which might be more limiting than the spout making a Kafka
> > consumer poll.
> >
> > 2.1 There is no room for an optimization which effects normal
> (non-failure)
> > case negatively. I was just suggesting if we can optimize the failure
> case
> > without a trade-off. Failure processing is quite independent of
> > successfully acked messages. I am especially thinking of the failure case
> > which, for some reason, is recurrent;  the same message which does not
> get
> > acked. I agree that it is much better to address the issue in the
> > application.
> >
> > 2.2 As you have correctly mentioned, subsequently polled messages
> > (previously acked/emitted) are no longer emitted. We cannot commit unless
> > that failed message is finally acked after few retries. Due to which, we
> > don't even process any new set of records.  We anyway keep the current
> set
> > of messages (polled repeatedly) in memory.  All we are doing is polling
> the
> > same set of messages and iterating till that one failure is rescheduled
> for
> > N times (worse with exponential back off).
> >
> > Anyway, we are dealing with rare scenarios which should get the least
> > priority especially if it introduces complexity.
> >
> > On a different note, sending arbitrarily (large) sized messages to Kafka
> is
> > a problem in itself.
> >
> > @Stig I will think about it further and evaluate the need for
> optimization.
> >
> > Thanks
> > Chandan
> >
> > On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <cks07...@gmail.com>
> wrote:
> >
> > > 1) Agreed that short duration blocks are less likely to cause an issue
> > but
> > > it will be much better to avoid them. Anyway, a conclusion is not easy
> > > without some benchmark. I will let You know if I am able to do some
> > volume
> > > testing on both options and observe significant difference.
> > >
> > > The auto spout block/wait when no messages are emitted makes absolute
> > > sense but a different scenario, less likely in a mostly loaded
> topology.
> > >
> > > You are absolutely correct about decoupling the consumption from
> > > processing when you see it from the perspective of Storm. I think there
> > is
> > > a very subtle difference when we keep Kafka in mind. It is the spout
> > thread
> > > which is polling (consuming) and iterating (processing) over the polled
> > > messages. If we separate the consumption in another thread and push
> > > messages in a queue, iterating (processing) is now concurrent and
> > > decoupled. Anyway, I too feel there should not be any significant
> > > difference in throughput but it will be interesting to measure the
> > > difference.
> > >
> > > I went overboard while mentioning seek before every poll when I had the
> > > failure scenario in mind.
> > >
> > > Thanks a lot. I will keep you guys updated on the switch from custom
> > spout
> > > to KafkaSpout.
> > >
> > > Thanks for the amazing work.
> > >
> > > Chandan
> > >
> > > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <cks07...@gmail.com>
> > > wrote:
> > >
> > >> Sorry about being cryptic there. What I meant is that it will be much
> > >> better if we don't make assumptions about frequency of failure rates
> in
> > >> topologies. I know it is more of a commonsense but out of curiosity,
> can
> > >> you point me to any Storm documentation which makes a comment on
> > preferable
> > >> failure rates. I was suggesting if we can offer the user an
> optimization
> > >> through clean API, the user will be free to decide on the rationale of
> > >> using it.
> > >>
> > >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
> > >> ev...@yahoo-inc.com.invalid> wrote:
> > >>
> > >>> I'm not sure what assumptions you want to make that this is
> preventing,
> > >>> or why they would be helpful.
> > >>>
> > >>> - Bobby
> > >>>
> > >>>
> > >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> > >>> cks07...@gmail.com> wrote:
> > >>>
> > >>> Hi Stig & Bobby
> > >>>
> > >>> Thanks for confirming my understanding.
> > >>>
> > >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
> > non-blocking
> > >>> has been a guideline on http://storm.apache.org/releas
> > >>> es/1.1.0/Concepts.html
> > >>> for long. Copying verbatim here : "The main method on spouts is
> > >>> nextTuple.
> > >>> nextTuple either emits a new tuple into the topology or simply
> returns
> > if
> > >>> there are no new tuples to emit. It is imperative that nextTuple does
> > not
> > >>> block for any spout implementation, because Storm calls all the spout
> > >>> methods on the same thread." I admit that there is some chance my
> > >>> interpretation is partially incorrect but I have been following it
> in a
> > >>> custom spout till now. Even though the objective is different, there
> > is a
> > >>> similar hint on Kafka official documentation. Please see under
> heading
> > >>> "2.
> > >>> Decouple Consumption and Processing" on
> > >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> > >>> kafka/clients/consumer/KafkaConsumer.html.
> > >>> Essentially, a thread polls Kafka and spout thread gets the messages
> > >>> through a shared queue. If pre-fetching is present in Kafka (I will
> > read
> > >>> about it further), I assume we do not have to fetch in another thread
> > >>> but I
> > >>> am not sure how does the pre-fetching behave with re-seeking before
> > every
> > >>> poll.
> > >>>
> > >>> 2) @Bobby, you are correct in pointing what needs to be optimized but
> > the
> > >>> facts, sometimes, prevent us from making assumptions. We do optimize
> > our
> > >>> retry loop such that we don't poll the messages again. I especially
> see
> > >>> problems when combined with exponential back off.  I am not sure how
> > >>> difficult or clean will it be to expose some sort of configuration to
> > >>> allow
> > >>> such optimization?  Do you think it will be worth trying out
> something?
> > >>>
> > >>> Thanks
> > >>> Chandan
> > >>>
> > >>
> > >>
> > >
> >
>

Reply via email to