There is a set of simple topologies in storm-perf for such benchmarking… we have one there that measures perf of the old KafkaSpout : https://github.com/apache/storm/blob/1.x-branch/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaSpoutNullBoltTopo.java
We don’t have one yet for the new Kafka spout… I had written one that never got around to contributing back…. https://github.com/roshannaik/storm/blob/perftopos1.x/examples/storm-starter/src/jvm/org/apache/storm/starter/perf/NewKafkaSpoutNullBoltTopo.java You can copy it into the storm-perf and use that as your starting point… the main method that submits the topo there needs to be made more like the ones we already have in official Storm. -roshan On 7/10/17, 10:49 PM, "chandan singh" <cks07...@gmail.com> wrote: Thanks @Stig for the detailed explanation. I have not yet used the KafkaSpout; was just going through the code to understand it. I will try the fixes too. In the topologies I run, I hardly create a scenario where the Spout is limiting the throughput. Still, I will try to do some bench marking by using a dummy topology and update you on my experience. Thanks a lot again. Chandan On Tue, Jul 11, 2017 at 3:46 AM, Stig Døssing <generalbas....@gmail.com> wrote: > > It is the spout thread which > > is polling (consuming) and iterating (processing) over the polled > messages. > > If we separate the consumption in another thread and push messages in a > > queue, iterating (processing) is now concurrent and decoupled > > Sure, but the bookkeeping done by the spout before emitting the polled > messages should be very lightweight, and pushing the messages (and > returning acks/fails) through another queue system isn't free. I'm just not > convinced that avoiding short duration blocks by running the consumer in a > separate thread has any benefit, but I'd be happy to see benchmarks. > > The auto spout block/wait when no messages are emitted makes absolute sense > > but a different scenario, less likely in a mostly loaded topology. > > Keep in mind that the consumer blocking on poll is also less likely in a > loaded topology, because the block only happens if either Kafka has no more > new messages, or the consumer prefetching fails to fetch messages quickly > enough. > > 1.2 Agreed again. I had taken the spout recommendation about non-blocking > > calls seriously, taking into account its critical role > > I think it's a good recommendation, but like I said I understand it to be > warning about blocking for long periods. Besides, the user can configure > how long of a block in poll they will accept for the KafkaSpout, so if the > current default timeout (200ms) is causing someone issues, it can always be > lowered. > > I am especially thinking of the failure case > > which, for some reason, is recurrent; the same message which does not > get > > acked. > > This case is something the user needs to deal with regardless of how we > implement the spout. If a message is repeatedly failing, the choice will be > between repeatedly retrying that message until it finally succeeds, or > dropping that message and moving on. Either option is something the user > can configure via the RetryService. > > We cannot commit unless > > that failed message is finally acked after few retries. Due to which, we > > don't even process any new set of records. > > > This behavior is caused by a bug in 1.1.0, which has since been fixed. Once > the failed message is fetched and retried, the spout should proceed past > the failed message and emit new tuples. The only reasons the spout should > stop emitting new tuples are if it hits the cap on uncommitted offsets > (configurable, see > https://github.com/apache/storm/blob/master/external/ > storm-kafka-client/src/main/java/org/apache/storm/kafka/ > spout/KafkaSpoutConfig.java#L47), > or if the failed message is so far back in the message stream that the > spout seeks back to the failed message, and then spends a long time > fetching and discarding already emitted tuples while catching up to where > it left off. > > There are bugs in the max uncommitted offsets mechanism right now that can > cause a spout to stop retrying tuples (some fixed here > https://issues.apache.org/jira/browse/STORM-2343, some still pending > https://issues.apache.org/jira/browse/STORM-2549, we're working to fix > this). I think the behavior you're seeing is caused by part of STORM-2343, > which is not fixed in 1.1.0. Basically in 1.1.0 when there was a failed > tuple, the spout would always seek to the committed offset for that > partition in order to get the failed tuple. This caused some issues > relating to being unable to retry messages that were too far from the > committed offset, and unnecessarily polling the messages next to the > committed offset when we didn't need to. We've changed it so we seek to the > offset we want to retry instead, which avoids these issues. See > https://github.com/apache/storm/pull/1924/files#diff- > 7d7cbc8f5444fa7ada7962033fc31c5eR302 > for details. You may want to try checking out 1.x-branch and building the > spout yourself to see if this solves your issue. > > 2017-07-10 23:08 GMT+02:00 chandan singh <cks07...@gmail.com>: > > > Hi Hugo > > > > Hope I do not come across as arguing for its own sake. > > > > 1.1 Agreed but I was not suggesting to use KafkaConsumer from multiple > > threads. We can use a single thread which is different from spout thread. > > That thread and spout will share a queue of ConsumerRecords. > > > > 1.2 Agreed again. I had taken the spout recommendation about non-blocking > > calls seriously, taking into account its critical role. I still believe > it > > is a sound recommendation but may be there are other factors like > topology > > throughput which might be more limiting than the spout making a Kafka > > consumer poll. > > > > 2.1 There is no room for an optimization which effects normal > (non-failure) > > case negatively. I was just suggesting if we can optimize the failure > case > > without a trade-off. Failure processing is quite independent of > > successfully acked messages. I am especially thinking of the failure case > > which, for some reason, is recurrent; the same message which does not > get > > acked. I agree that it is much better to address the issue in the > > application. > > > > 2.2 As you have correctly mentioned, subsequently polled messages > > (previously acked/emitted) are no longer emitted. We cannot commit unless > > that failed message is finally acked after few retries. Due to which, we > > don't even process any new set of records. We anyway keep the current > set > > of messages (polled repeatedly) in memory. All we are doing is polling > the > > same set of messages and iterating till that one failure is rescheduled > for > > N times (worse with exponential back off). > > > > Anyway, we are dealing with rare scenarios which should get the least > > priority especially if it introduces complexity. > > > > On a different note, sending arbitrarily (large) sized messages to Kafka > is > > a problem in itself. > > > > @Stig I will think about it further and evaluate the need for > optimization. > > > > Thanks > > Chandan > > > > On Tue, Jul 11, 2017 at 1:37 AM, chandan singh <cks07...@gmail.com> > wrote: > > > > > 1) Agreed that short duration blocks are less likely to cause an issue > > but > > > it will be much better to avoid them. Anyway, a conclusion is not easy > > > without some benchmark. I will let You know if I am able to do some > > volume > > > testing on both options and observe significant difference. > > > > > > The auto spout block/wait when no messages are emitted makes absolute > > > sense but a different scenario, less likely in a mostly loaded > topology. > > > > > > You are absolutely correct about decoupling the consumption from > > > processing when you see it from the perspective of Storm. I think there > > is > > > a very subtle difference when we keep Kafka in mind. It is the spout > > thread > > > which is polling (consuming) and iterating (processing) over the polled > > > messages. If we separate the consumption in another thread and push > > > messages in a queue, iterating (processing) is now concurrent and > > > decoupled. Anyway, I too feel there should not be any significant > > > difference in throughput but it will be interesting to measure the > > > difference. > > > > > > I went overboard while mentioning seek before every poll when I had the > > > failure scenario in mind. > > > > > > Thanks a lot. I will keep you guys updated on the switch from custom > > spout > > > to KafkaSpout. > > > > > > Thanks for the amazing work. > > > > > > Chandan > > > > > > On Tue, Jul 11, 2017 at 12:56 AM, chandan singh <cks07...@gmail.com> > > > wrote: > > > > > >> Sorry about being cryptic there. What I meant is that it will be much > > >> better if we don't make assumptions about frequency of failure rates > in > > >> topologies. I know it is more of a commonsense but out of curiosity, > can > > >> you point me to any Storm documentation which makes a comment on > > preferable > > >> failure rates. I was suggesting if we can offer the user an > optimization > > >> through clean API, the user will be free to decide on the rationale of > > >> using it. > > >> > > >> On Tue, Jul 11, 2017 at 12:06 AM, Bobby Evans < > > >> ev...@yahoo-inc.com.invalid> wrote: > > >> > > >>> I'm not sure what assumptions you want to make that this is > preventing, > > >>> or why they would be helpful. > > >>> > > >>> - Bobby > > >>> > > >>> > > >>> On Monday, July 10, 2017, 12:14:53 PM CDT, chandan singh < > > >>> cks07...@gmail.com> wrote: > > >>> > > >>> Hi Stig & Bobby > > >>> > > >>> Thanks for confirming my understanding. > > >>> > > >>> 1) Ensuring that calls to nexTuple(), ack() and fail() are > > non-blocking > > >>> has been a guideline on http://storm.apache.org/releas > > >>> es/1.1.0/Concepts.html > > >>> for long. Copying verbatim here : "The main method on spouts is > > >>> nextTuple. > > >>> nextTuple either emits a new tuple into the topology or simply > returns > > if > > >>> there are no new tuples to emit. It is imperative that nextTuple does > > not > > >>> block for any spout implementation, because Storm calls all the spout > > >>> methods on the same thread." I admit that there is some chance my > > >>> interpretation is partially incorrect but I have been following it > in a > > >>> custom spout till now. Even though the objective is different, there > > is a > > >>> similar hint on Kafka official documentation. Please see under > heading > > >>> "2. > > >>> Decouple Consumption and Processing" on > > >>> https://kafka.apache.org/0110/javadoc/index.html?org/apache/ > > >>> kafka/clients/consumer/KafkaConsumer.html. > > >>> Essentially, a thread polls Kafka and spout thread gets the messages > > >>> through a shared queue. If pre-fetching is present in Kafka (I will > > read > > >>> about it further), I assume we do not have to fetch in another thread > > >>> but I > > >>> am not sure how does the pre-fetching behave with re-seeking before > > every > > >>> poll. > > >>> > > >>> 2) @Bobby, you are correct in pointing what needs to be optimized but > > the > > >>> facts, sometimes, prevent us from making assumptions. We do optimize > > our > > >>> retry loop such that we don't poll the messages again. I especially > see > > >>> problems when combined with exponential back off. I am not sure how > > >>> difficult or clean will it be to expose some sort of configuration to > > >>> allow > > >>> such optimization? Do you think it will be worth trying out > something? > > >>> > > >>> Thanks > > >>> Chandan > > >>> > > >> > > >> > > > > > >