> It is the spout thread which
> is polling (consuming) and iterating (processing) over the polled messages.
> If we separate the consumption in another thread and push messages in a
> queue, iterating (processing) is now concurrent and decoupled

Sure, but the bookkeeping done by the spout before emitting the polled
messages should be very lightweight, and pushing the messages (and
returning acks/fails) through another queue system isn't free. I'm just not
convinced that avoiding short duration blocks by running the consumer in a
separate thread has any benefit, but I'd be happy to see benchmarks.

The auto spout block/wait when no messages are emitted makes absolute sense
> but a different scenario, less likely in a mostly loaded topology.

Keep in mind that the consumer blocking on poll is also less likely in a
loaded topology, because the block only happens if either Kafka has no more
new messages, or the consumer prefetching fails to fetch messages quickly
enough.

1.2 Agreed again. I had taken the spout recommendation about non-blocking
> calls seriously, taking into account its critical role

I think it's a good recommendation, but like I said I understand it to be
warning about blocking for long periods. Besides, the user can configure
how long of a block in poll they will accept for the KafkaSpout, so if the
current default timeout (200ms) is causing someone issues, it can always be
lowered.

I am especially thinking of the failure case
> which, for some reason, is recurrent;  the same message which does not get
> acked.

This case is something the user needs to deal with regardless of how we
implement the spout. If a message is repeatedly failing, the choice will be
between repeatedly retrying that message until it finally succeeds, or
dropping that message and moving on. Either option is something the user
can configure via the RetryService.

We cannot commit unless
> that failed message is finally acked after few retries. Due to which, we
> don't even process any new set of records.
>
This behavior is caused by a bug in 1.1.0, which has since been fixed. Once
the failed message is fetched and retried, the spout should proceed past
the failed message and emit new tuples. The only reasons the spout should
stop emitting new tuples are if it hits the cap on uncommitted offsets
(configurable, see
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L47),
or if the failed message is so far back in the message stream that the
spout seeks back to the failed message, and then spends a long time
fetching and discarding already emitted tuples while catching up to where
it left off.

There are bugs in the max uncommitted offsets mechanism right now that can
cause a spout to stop retrying tuples (some fixed here
https://issues.apache.org/jira/browse/STORM-2343, some still pending
https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
this). I think the behavior you're seeing is caused by part of STORM-2343,
which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
tuple, the spout would always seek to the committed offset for that
partition in order to get the failed tuple. This caused some issues
relating to being unable to retry messages that were too far from the
committed offset, and unnecessarily polling the messages next to the
committed offset when we didn't need to. We've changed it so we seek to the
offset we want to retry instead, which avoids these issues. See
https://github.com/apache/storm/pull/1924/files#diff-7d7cbc8f5444fa7ada7962033fc31c5eR302
for details. You may want to try checking out 1.x-branch and building the
spout yourself to see if this solves your issue.

2017-07-10 23:08 GMT+02:00 chandan singh <cks07...@gmail.com>:

> Hi Hugo
>
> Hope I do not come across as arguing for its own sake.
>
> 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
> threads. We can use a single thread which is different from spout thread.
> That thread and spout will share a queue of ConsumerRecords.
>
> 1.2 Agreed again. I had taken the spout recommendation about non-blocking
> calls seriously, taking into account its critical role. I still believe it
> is a sound recommendation but may be there are other factors like topology
> throughput which might be more limiting than the spout making a Kafka
> consumer poll.
>
> 2.1 There is no room for an optimization which effects normal (non-failure)
> case negatively. I was just suggesting if we can optimize the failure case
> without a trade-off. Failure processing is quite independent of
> successfully acked messages. I am especially thinking of the failure case
> which, for some reason, is recurrent;  the same message which does not get
> acked. I agree that it is much better to address the issue in the
> application.
>
> 2.2 As you have correctly mentioned, subsequently polled messages
> (previously acked/emitted) are no longer emitted. We cannot commit unless
> that failed message is finally acked after few retries. Due to which, we
> don't even process any new set of records.  We anyway keep the current set
> of messages (polled repeatedly) in memory.  All we are doing is polling the
> same set of messages and iterating till that one failure is rescheduled for
> N times (worse with exponential back off).
>
> Anyway, we are dealing with rare scenarios which should get the least
> priority especially if it introduces complexity.
>
> On a different note, sending arbitrarily (large) sized messages to Kafka is
> a problem in itself.
>
> @Stig I will think about it further and evaluate the need for optimization.
>
> Thanks
> Chandan
>
> On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <cks07...@gmail.com> wrote:
>
> > 1) Agreed that short duration blocks are less likely to cause an issue
> but
> > it will be much better to avoid them. Anyway, a conclusion is not easy
> > without some benchmark. I will let You know if I am able to do some
> volume
> > testing on both options and observe significant difference.
> >
> > The auto spout block/wait when no messages are emitted makes absolute
> > sense but a different scenario, less likely in a mostly loaded topology.
> >
> > You are absolutely correct about decoupling the consumption from
> > processing when you see it from the perspective of Storm. I think there
> is
> > a very subtle difference when we keep Kafka in mind. It is the spout
> thread
> > which is polling (consuming) and iterating (processing) over the polled
> > messages. If we separate the consumption in another thread and push
> > messages in a queue, iterating (processing) is now concurrent and
> > decoupled. Anyway, I too feel there should not be any significant
> > difference in throughput but it will be interesting to measure the
> > difference.
> >
> > I went overboard while mentioning seek before every poll when I had the
> > failure scenario in mind.
> >
> > Thanks a lot. I will keep you guys updated on the switch from custom
> spout
> > to KafkaSpout.
> >
> > Thanks for the amazing work.
> >
> > Chandan
> >
> > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <cks07...@gmail.com>
> > wrote:
> >
> >> Sorry about being cryptic there. What I meant is that it will be much
> >> better if we don't make assumptions about frequency of failure rates in
> >> topologies. I know it is more of a commonsense but out of curiosity, can
> >> you point me to any Storm documentation which makes a comment on
> preferable
> >> failure rates. I was suggesting if we can offer the user an optimization
> >> through clean API, the user will be free to decide on the rationale of
> >> using it.
> >>
> >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
> >> ev...@yahoo-inc.com.invalid> wrote:
> >>
> >>> I'm not sure what assumptions you want to make that this is preventing,
> >>> or why they would be helpful.
> >>>
> >>> - Bobby
> >>>
> >>>
> >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
> >>> cks07...@gmail.com> wrote:
> >>>
> >>> Hi Stig & Bobby
> >>>
> >>> Thanks for confirming my understanding.
> >>>
> >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
> non-blocking
> >>> has been a guideline on http://storm.apache.org/releas
> >>> es/1.1.0/Concepts.html
> >>> for long. Copying verbatim here : "The main method on spouts is
> >>> nextTuple.
> >>> nextTuple either emits a new tuple into the topology or simply returns
> if
> >>> there are no new tuples to emit. It is imperative that nextTuple does
> not
> >>> block for any spout implementation, because Storm calls all the spout
> >>> methods on the same thread." I admit that there is some chance my
> >>> interpretation is partially incorrect but I have been following it in a
> >>> custom spout till now. Even though the objective is different, there
> is a
> >>> similar hint on Kafka official documentation. Please see under heading
> >>> "2.
> >>> Decouple Consumption and Processing" on
> >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
> >>> kafka/clients/consumer/KafkaConsumer.html.
> >>> Essentially, a thread polls Kafka and spout thread gets the messages
> >>> through a shared queue. If pre-fetching is present in Kafka (I will
> read
> >>> about it further), I assume we do not have to fetch in another thread
> >>> but I
> >>> am not sure how does the pre-fetching behave with re-seeking before
> every
> >>> poll.
> >>>
> >>> 2) @Bobby, you are correct in pointing what needs to be optimized but
> the
> >>> facts, sometimes, prevent us from making assumptions. We do optimize
> our
> >>> retry loop such that we don't poll the messages again. I especially see
> >>> problems when combined with exponential back off.  I am not sure how
> >>> difficult or clean will it be to expose some sort of configuration to
> >>> allow
> >>> such optimization?  Do you think it will be worth trying out something?
> >>>
> >>> Thanks
> >>> Chandan
> >>>
> >>
> >>
> >
>

Reply via email to