There is a set of simple topologies in storm-perf for such benchmarking… we 
have one there that measures perf of the old KafkaSpout :
https://github.com/apache/storm/blob/1.x-branch/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java

We don’t have one yet for the new Kafka spout… I had written one that never got 
around to contributing back….
https://github.com/roshannaik/storm/blob/perftopos1.x/examples/storm-starter/src/jvm/org/apache/storm/starter/perf/NewKafkaSpoutNullBoltTopo.java

You can copy it into the storm-perf and use that as your starting point… the 
main method that submits the topo there needs to be made more like the ones we 
already have in official Storm.

-roshan

On 7/10/17, 10:49 PM, "chandan singh" <cks07...@gmail.com> wrote:

    Thanks @Stig for the detailed explanation. I have not yet used the
    KafkaSpout; was just going through the code to understand it. I will try
    the fixes too. In the topologies I run, I hardly create a scenario where
    the Spout is limiting the throughput. Still, I will try to do some bench
    marking by using a dummy topology and update you on my experience.
    
    Thanks a lot again.
    Chandan
    
    On Tue, Jul 11, 2017 at 3:46 AM, Stig Døssing <generalbas....@gmail.com>
    wrote:
    
    > > It is the spout thread which
    > > is polling (consuming) and iterating (processing) over the polled
    > messages.
    > > If we separate the consumption in another thread and push messages in a
    > > queue, iterating (processing) is now concurrent and decoupled
    >
    > Sure, but the bookkeeping done by the spout before emitting the polled
    > messages should be very lightweight, and pushing the messages (and
    > returning acks/fails) through another queue system isn't free. I'm just 
not
    > convinced that avoiding short duration blocks by running the consumer in a
    > separate thread has any benefit, but I'd be happy to see benchmarks.
    >
    > The auto spout block/wait when no messages are emitted makes absolute 
sense
    > > but a different scenario, less likely in a mostly loaded topology.
    >
    > Keep in mind that the consumer blocking on poll is also less likely in a
    > loaded topology, because the block only happens if either Kafka has no 
more
    > new messages, or the consumer prefetching fails to fetch messages quickly
    > enough.
    >
    > 1.2 Agreed again. I had taken the spout recommendation about non-blocking
    > > calls seriously, taking into account its critical role
    >
    > I think it's a good recommendation, but like I said I understand it to be
    > warning about blocking for long periods. Besides, the user can configure
    > how long of a block in poll they will accept for the KafkaSpout, so if the
    > current default timeout (200ms) is causing someone issues, it can always 
be
    > lowered.
    >
    > I am especially thinking of the failure case
    > > which, for some reason, is recurrent;  the same message which does not
    > get
    > > acked.
    >
    > This case is something the user needs to deal with regardless of how we
    > implement the spout. If a message is repeatedly failing, the choice will 
be
    > between repeatedly retrying that message until it finally succeeds, or
    > dropping that message and moving on. Either option is something the user
    > can configure via the RetryService.
    >
    > We cannot commit unless
    > > that failed message is finally acked after few retries. Due to which, we
    > > don't even process any new set of records.
    > >
    > This behavior is caused by a bug in 1.1.0, which has since been fixed. 
Once
    > the failed message is fetched and retried, the spout should proceed past
    > the failed message and emit new tuples. The only reasons the spout should
    > stop emitting new tuples are if it hits the cap on uncommitted offsets
    > (configurable, see
    > https://github.com/apache/storm/blob/master/external/
    > storm-kafka-client/src/main/java/org/apache/storm/kafka/
    > spout/KafkaSpoutConfig.java#L47),
    > or if the failed message is so far back in the message stream that the
    > spout seeks back to the failed message, and then spends a long time
    > fetching and discarding already emitted tuples while catching up to where
    > it left off.
    >
    > There are bugs in the max uncommitted offsets mechanism right now that can
    > cause a spout to stop retrying tuples (some fixed here
    > https://issues.apache.org/jira/browse/STORM-2343, some still pending
    > https://issues.apache.org/jira/browse/STORM-2549, we're working to fix
    > this). I think the behavior you're seeing is caused by part of STORM-2343,
    > which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed
    > tuple, the spout would always seek to the committed offset for that
    > partition in order to get the failed tuple. This caused some issues
    > relating to being unable to retry messages that were too far from the
    > committed offset, and unnecessarily polling the messages next to the
    > committed offset when we didn't need to. We've changed it so we seek to 
the
    > offset we want to retry instead, which avoids these issues. See
    > https://github.com/apache/storm/pull/1924/files#diff-
    > 7d7cbc8f5444fa7ada7962033fc31c5eR302
    > for details. You may want to try checking out 1.x-branch and building the
    > spout yourself to see if this solves your issue.
    >
    > 2017-07-10 23:08 GMT+02:00 chandan singh <cks07...@gmail.com>:
    >
    > > Hi Hugo
    > >
    > > Hope I do not come across as arguing for its own sake.
    > >
    > > 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple
    > > threads. We can use a single thread which is different from spout 
thread.
    > > That thread and spout will share a queue of ConsumerRecords.
    > >
    > > 1.2 Agreed again. I had taken the spout recommendation about 
non-blocking
    > > calls seriously, taking into account its critical role. I still believe
    > it
    > > is a sound recommendation but may be there are other factors like
    > topology
    > > throughput which might be more limiting than the spout making a Kafka
    > > consumer poll.
    > >
    > > 2.1 There is no room for an optimization which effects normal
    > (non-failure)
    > > case negatively. I was just suggesting if we can optimize the failure
    > case
    > > without a trade-off. Failure processing is quite independent of
    > > successfully acked messages. I am especially thinking of the failure 
case
    > > which, for some reason, is recurrent;  the same message which does not
    > get
    > > acked. I agree that it is much better to address the issue in the
    > > application.
    > >
    > > 2.2 As you have correctly mentioned, subsequently polled messages
    > > (previously acked/emitted) are no longer emitted. We cannot commit 
unless
    > > that failed message is finally acked after few retries. Due to which, we
    > > don't even process any new set of records.  We anyway keep the current
    > set
    > > of messages (polled repeatedly) in memory.  All we are doing is polling
    > the
    > > same set of messages and iterating till that one failure is rescheduled
    > for
    > > N times (worse with exponential back off).
    > >
    > > Anyway, we are dealing with rare scenarios which should get the least
    > > priority especially if it introduces complexity.
    > >
    > > On a different note, sending arbitrarily (large) sized messages to Kafka
    > is
    > > a problem in itself.
    > >
    > > @Stig I will think about it further and evaluate the need for
    > optimization.
    > >
    > > Thanks
    > > Chandan
    > >
    > > On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <cks07...@gmail.com>
    > wrote:
    > >
    > > > 1) Agreed that short duration blocks are less likely to cause an issue
    > > but
    > > > it will be much better to avoid them. Anyway, a conclusion is not easy
    > > > without some benchmark. I will let You know if I am able to do some
    > > volume
    > > > testing on both options and observe significant difference.
    > > >
    > > > The auto spout block/wait when no messages are emitted makes absolute
    > > > sense but a different scenario, less likely in a mostly loaded
    > topology.
    > > >
    > > > You are absolutely correct about decoupling the consumption from
    > > > processing when you see it from the perspective of Storm. I think 
there
    > > is
    > > > a very subtle difference when we keep Kafka in mind. It is the spout
    > > thread
    > > > which is polling (consuming) and iterating (processing) over the 
polled
    > > > messages. If we separate the consumption in another thread and push
    > > > messages in a queue, iterating (processing) is now concurrent and
    > > > decoupled. Anyway, I too feel there should not be any significant
    > > > difference in throughput but it will be interesting to measure the
    > > > difference.
    > > >
    > > > I went overboard while mentioning seek before every poll when I had 
the
    > > > failure scenario in mind.
    > > >
    > > > Thanks a lot. I will keep you guys updated on the switch from custom
    > > spout
    > > > to KafkaSpout.
    > > >
    > > > Thanks for the amazing work.
    > > >
    > > > Chandan
    > > >
    > > > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <cks07...@gmail.com>
    > > > wrote:
    > > >
    > > >> Sorry about being cryptic there. What I meant is that it will be much
    > > >> better if we don't make assumptions about frequency of failure rates
    > in
    > > >> topologies. I know it is more of a commonsense but out of curiosity,
    > can
    > > >> you point me to any Storm documentation which makes a comment on
    > > preferable
    > > >> failure rates. I was suggesting if we can offer the user an
    > optimization
    > > >> through clean API, the user will be free to decide on the rationale 
of
    > > >> using it.
    > > >>
    > > >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans <
    > > >> ev...@yahoo-inc.com.invalid> wrote:
    > > >>
    > > >>> I'm not sure what assumptions you want to make that this is
    > preventing,
    > > >>> or why they would be helpful.
    > > >>>
    > > >>> - Bobby
    > > >>>
    > > >>>
    > > >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh <
    > > >>> cks07...@gmail.com> wrote:
    > > >>>
    > > >>> Hi Stig & Bobby
    > > >>>
    > > >>> Thanks for confirming my understanding.
    > > >>>
    > > >>> 1) Ensuring that calls to nexTuple(), ack()  and fail() are
    > > non-blocking
    > > >>> has been a guideline on http://storm.apache.org/releas
    > > >>> es/1.1.0/Concepts.html
    > > >>> for long. Copying verbatim here : "The main method on spouts is
    > > >>> nextTuple.
    > > >>> nextTuple either emits a new tuple into the topology or simply
    > returns
    > > if
    > > >>> there are no new tuples to emit. It is imperative that nextTuple 
does
    > > not
    > > >>> block for any spout implementation, because Storm calls all the 
spout
    > > >>> methods on the same thread." I admit that there is some chance my
    > > >>> interpretation is partially incorrect but I have been following it
    > in a
    > > >>> custom spout till now. Even though the objective is different, there
    > > is a
    > > >>> similar hint on Kafka official documentation. Please see under
    > heading
    > > >>> "2.
    > > >>> Decouple Consumption and Processing" on
    > > >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/
    > > >>> kafka/clients/consumer/KafkaConsumer.html.
    > > >>> Essentially, a thread polls Kafka and spout thread gets the messages
    > > >>> through a shared queue. If pre-fetching is present in Kafka (I will
    > > read
    > > >>> about it further), I assume we do not have to fetch in another 
thread
    > > >>> but I
    > > >>> am not sure how does the pre-fetching behave with re-seeking before
    > > every
    > > >>> poll.
    > > >>>
    > > >>> 2) @Bobby, you are correct in pointing what needs to be optimized 
but
    > > the
    > > >>> facts, sometimes, prevent us from making assumptions. We do optimize
    > > our
    > > >>> retry loop such that we don't poll the messages again. I especially
    > see
    > > >>> problems when combined with exponential back off.  I am not sure how
    > > >>> difficult or clean will it be to expose some sort of configuration 
to
    > > >>> allow
    > > >>> such optimization?  Do you think it will be worth trying out
    > something?
    > > >>>
    > > >>> Thanks
    > > >>> Chandan
    > > >>>
    > > >>
    > > >>
    > > >
    > >
    >
    

Reply via email to