Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
Rohit, The 30s number sounds very suspicious because it is exactly the value of the session timeout. But if you are driving the consumer correctly, you shouldn't normally hit this timeout. Dana was asking about consumers leaving gracefully because that is one case where you can inadvertently trigger the 30s timeout, require *all* group members to wait that long before they decide one of the previous members has ungracefully left the group and they move on without it. It sounds like something you are doing is causing the group to wait for the session timeout. Is it possible any of your processes are exiting without calling consumer.close()? Or that any of your processes are not calling consumer.poll() within the session timeout of 30s? This can sometimes happen if they receive too much data and take too long to process it (0.10 introduced max.poll.records to help users control this, and we're making further refinements to the consumer to provide better application control over number of messages fetched vs total processing time). -Ewen On Sun, Jun 19, 2016 at 10:01 PM, Rohit Sardesaiwrote: > > Can anybody help out on this? > > From: Rohit Sardesai > Sent: 19 June 2016 11:47:01 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > > In my tests , I am using around 24 consumer groups. I never call > consumer.close() or consumer.unsubscribe() until the application is > shutting down. > > So the consumers never leave but new consumer instances do get created as > the parallel requests pile up . Also, I am reusing consumer instances > > if they are idle ( i,.e not serving any consume request). So with 9 > partitions , I do 9 parallel consume requests in parallel every second > under the same consumer group. > > So to summarize I have the following test setup : 3 Kafka brokers , 2 > zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume > requests at a time. > > > > From: Dana Powers > Sent: 19 June 2016 10:45 > To: users@kafka.apache.org > Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer > api > > Is your test reusing a group name? And if so, are your consumer instances > gracefully leaving? This may cause subsequent 'rebalance' operations to > block until those old consumers check-in or the session timeout happens > (30secs) > > -Dana > On Jun 18, 2016 8:56 PM, "Rohit Sardesai" > wrote: > > > I am using the group management feature of Kafka 0.9 to handle partition > > assignment to consumer instances. I use the subscribe() API to subscribe > to > > the topic I am interested in reading data from. I have an environment > > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I > created > > a topic with 9 partitions . The performance tests attempt to send 9 > > parallel poll() requests to the Kafka brokers every second. The results > > show that each poll() operation takes around 30 seconds for the first > time > > it polls and returns 0 records. Also , when I print the partition > > assignment to this consumer instance , I see no partitions assigned to > it. > > The next poll() does return quickly ( ~ 10-20 ms) with data and some > > partitions assigned to it. > > > > With each consumer taking 30 seconds , the performance tests report very > > low throughput since I run the tests for around 1000 seconds out which I > > produce messages on the topic for the complete duration and I start the > > parallel consume requests after 400 seconds. So out of 400 seconds , > with 9 > > consumers taking 30 seconds each , around 270 seconds are spent in the > > first poll without any data. Is this because of the re-balance operation > > that the consumers are blocked on the poll() ? What is the best way to > use > > poll() if I have to serve many parallel requests per second ? Should I > > prefer manual assignment of partitions in this case instead of relying on > > re-balance ? > > > > > > Regards, > > > > Rohit Sardesai > > > > > -- Thanks, Ewen
Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
Can anybody help out on this? From: Rohit Sardesai Sent: 19 June 2016 11:47:01 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api In my tests , I am using around 24 consumer groups. I never call consumer.close() or consumer.unsubscribe() until the application is shutting down. So the consumers never leave but new consumer instances do get created as the parallel requests pile up . Also, I am reusing consumer instances if they are idle ( i,.e not serving any consume request). So with 9 partitions , I do 9 parallel consume requests in parallel every second under the same consumer group. So to summarize I have the following test setup : 3 Kafka brokers , 2 zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume requests at a time. From: Dana PowersSent: 19 June 2016 10:45 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api Is your test reusing a group name? And if so, are your consumer instances gracefully leaving? This may cause subsequent 'rebalance' operations to block until those old consumers check-in or the session timeout happens (30secs) -Dana On Jun 18, 2016 8:56 PM, "Rohit Sardesai" wrote: > I am using the group management feature of Kafka 0.9 to handle partition > assignment to consumer instances. I use the subscribe() API to subscribe to > the topic I am interested in reading data from. I have an environment > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I created > a topic with 9 partitions . The performance tests attempt to send 9 > parallel poll() requests to the Kafka brokers every second. The results > show that each poll() operation takes around 30 seconds for the first time > it polls and returns 0 records. Also , when I print the partition > assignment to this consumer instance , I see no partitions assigned to it. > The next poll() does return quickly ( ~ 10-20 ms) with data and some > partitions assigned to it. > > With each consumer taking 30 seconds , the performance tests report very > low throughput since I run the tests for around 1000 seconds out which I > produce messages on the topic for the complete duration and I start the > parallel consume requests after 400 seconds. So out of 400 seconds , with 9 > consumers taking 30 seconds each , around 270 seconds are spent in the > first poll without any data. Is this because of the re-balance operation > that the consumers are blocked on the poll() ? What is the best way to use > poll() if I have to serve many parallel requests per second ? Should I > prefer manual assignment of partitions in this case instead of relying on > re-balance ? > > > Regards, > > Rohit Sardesai > >
Re: Fail fast producer/consumer when no connection to Kafka brokers cluster
You can adjust request.timeout.ms, which is shared between both new producer and new consumer. I don't think its quite what you want, but probably the closest that exists across both clients. There's not much more than that -- when you say "when the connection to the entire broker cluster is lost" that's not really something that we'd expect to happen; you might lose connectivity to some brokers, but even determining that the connection "to the entire broker cluster" is lost is not something you can easily determine (and often times requires long timeouts to determine anyway). -Ewen On Fri, Jun 17, 2016 at 3:27 AM, Spico Florinwrote: > Hello! > I would like to know what are the configurations/properties for the > producer/consumer in order fail fast when the connection to the entire > broker cluster is lost. > For example if we can set up a parameter that when the connection trial > reached a treshold then disconnect and throw an exception. > > I look forward for your answers. > Regards, > Florin > -- Thanks, Ewen
Re: [DISCUSS] Java 8 as a minimum requirement
Hi Ismael, Agree on timing is more important. If we give enough heads up to the users who are on Java 7 thats great but still shipping this in 0.10.x line is won't be good as it still perceived as maint release even the release might contain lot of features . If we can make this as part of 0.11 and cutting 0.10.1 features moving to 0.11 and giving rough timeline when that would be released would be ideal. Thanks, Harsha On Fri, Jun 17, 2016, at 11:13 AM, Ismael Juma wrote: > Hi Harsha, > > Comments below. > > On Fri, Jun 17, 2016 at 7:48 PM, Harshawrote: > > > Hi Ismael, > > "Are you saying that you are aware of many Kafka users still > > using Java 7 > > > who would be ready to upgrade to the next Kafka feature release (whatever > > > that version number is) before they can upgrade to Java 8?" > > I know there quite few users who are still on java 7 > > > This is good to know. > > > > and regarding the > > upgrade we can't say Yes or no. Its upto the user discretion when they > > choose to upgrade and ofcourse if there are any critical fixes that > > might go into the release. We shouldn't be restricting their upgrade > > path just because we removed Java 7 support. > > > > My point is that both paths have their pros and cons and we need to weigh > them up. If some users are slow to upgrade the Java version (Java 7 has > been EOL'd for over a year), there's a good chance that they are slow to > upgrade Kafka too. And if that is the case (and it may not be), then > holding up improvements for the ones who actually do upgrade may be the > wrong call. To be clear, I am still in listening mode and I haven't made > up > my mind on the subject. > > Once we released 0.9.0 there aren't any 0.8.x releases. i.e we don't > > have LTS type release where we continually ship critical fixes over > > 0.8.x minor releases. So if a user notices a critical fix the only > > option today is to upgrade to next version where that fix is shipped. > > > > We haven't done a great job at this in the past, but there is no decision > that once a new major release is out, we don't do patch releases for the > previous major release. In fact, we have been collecting critical fixes > in > the 0.9.0 branch for a potential 0.9.0.2. > > I understand there is no decision made yet but given the premise was to > > ship this in 0.10.x , possibly 0.10.1 which I don't agree with. In > > general against shipping this in 0.10.x version. Removing Java 7 support > > when the release is minor in general not a good idea to users. > > > > Sorry if I didn't communicate this properly. I simply meant the next > feature release. I used 0.10.1.0 as an example, but it could also be > 0.11.0.0 if that turns out to be the next release. A discussion on that > will probably take place once the scope is clear. Personally, I think the > timing is more important the the version number, but it seems like some > people disagree. > > Ismael
Re: Kafka Connect HdfsSink and the Schema Registry
Great, glad you sorted it out. If the namespace is being omitted incorrectly from the request the connector is making, please file a bug report -- I can't think of a reason we'd omit that, but it's certainly possible it is a bug on our side. -Ewen On Wed, Jun 15, 2016 at 7:08 AM, Tauzell, Davewrote: > Thanks Ewan, > > The second request was made by me directly. I'm trying to add this > functionality into my .Net application. The library I'm using doesn't have > an implementation of the AvroSeriazlizer that interacts with the schema > registry. I've now added in code to make to POST to > /subjects/-value with the schema. Something I noticed is that I > was using schema like this: > > { > "subject": "AuditHdfsTest5-value", > "version": 1, > "id": 5, > "schema": > "{\"type\":\"record\",\"name\":\"GenericAuditRecord\",\"namespace\":\"audit\",\"fields\":[{\"name\":\"xml\",\"type\":[\"string\",\"null\"]}]}" > } > > When the connector got a message and did a lookup it didn't have the > "namespace" field and the lookup failed. I then posted a new version of > the schema without the "namespace" field and it worked. > > -Dave > > Dave Tauzell | Senior Software Engineer | Surescripts > O: 651.855.3042 | www.surescripts.com | dave.tauz...@surescripts.com > Connect with us: Twitter I LinkedIn I Facebook I YouTube > > > -Original Message- > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > Sent: Tuesday, June 14, 2016 6:59 PM > To: users@kafka.apache.org > Subject: Re: Kafka Connect HdfsSink and the Schema Registry > > On Tue, Jun 14, 2016 at 8:08 AM, Tauzell, Dave < > dave.tauz...@surescripts.com > > wrote: > > > I have been able to get my C# client to put avro records to a Kafka > > topic and have the HdfsSink read and save them in files. I am > > confused about interaction with the registry. The kafka message > > contains a schema id an I see the connector look that up in the > > registry. Then it also looks up a subject which is -value. > > > > What is the relationship between the passed schema id and the subject > > which is derived from the topic name? > > > > The HDFS connector doesn't work directly with the schema registry, the > AvroConverter does. I'm not sure what the second request you're seeing is > -- normally it would only look up the schema ID in order to get the schema. > Where are you seeing the second request, and can you include some logs? I > can't think of any other requests the AvroConverter would be making just > for deserialization. > > The subject names are generating in the serializer as -key and > -value and this is just the standardized approach Confluent's > serializers use. The ID will have been registered under that subject. > > -Ewen > > > > > > -Dave > > > > This e-mail and any files transmitted with it are confidential, may > > contain sensitive information, and are intended solely for the use of > > the individual or entity to whom they are addressed. If you have > > received this e-mail in error, please notify the sender by reply > > e-mail immediately and destroy all copies of the e-mail and any > attachments. > > > > > > -- > Thanks, > Ewen > This e-mail and any files transmitted with it are confidential, may > contain sensitive information, and are intended solely for the use of the > individual or entity to whom they are addressed. If you have received this > e-mail in error, please notify the sender by reply e-mail immediately and > destroy all copies of the e-mail and any attachments. > -- Thanks, Ewen
Re: General Question About Kafka
The most common use case for Kafka is within a data center, but you can absolutely produce data across the WAN. You may need to adjust some settings (e.g. timeouts, max in flight requests per connection if you want high throughput) to account for operating over the WAN, but you can definitely do it. -Ewen On Wed, Jun 15, 2016 at 12:02 AM, aliwrote: > Hello Guys. > > > > We are going to install Apache Kafka in our local data center and different > > producers which are distributed across different locations will be > connected > to this server. > > Our Producers will use Internet connection and also will send 10mg data > packages every 30 seconds. > > I was wondering is actually Apache Kafka suite for my scenario ? Since we > will use Internet connection > > internet , should I be worried about network related problems such as > performance and latency ? > > > > Thank you > > Ali > > -- Thanks, Ewen
Re: Consumer Question
Hi Chris, We should also ensure that auto.create.topics.enable is set to true. Thank you, Anirudh Hi Chris, If the topic not exist, it will create a new topic with the name which you give. Thanks, Nicole On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlockwrote: > If you have a consumer listening on a topic and that topic is deleted is > the consumer made aware -- perhaps by some exception -- or does it > continue listening, blissfully unaware that it will never hear anything > more? > > Thanks, > > Chris > > >
Re: Consumer Question
Hi Chris, If the topic not exist, it will create a new topic with the name which you give. Thanks, Nicole On Sat, Jun 18, 2016 at 1:55 AM, Chris Barlockwrote: > If you have a consumer listening on a topic and that topic is deleted is > the consumer made aware -- perhaps by some exception -- or does it > continue listening, blissfully unaware that it will never hear anything > more? > > Thanks, > > Chris > > >
Re: Wordcount with reduce
Hi Matthias, I solved the problem with specifying the serders and reading source as KStream instead of KTable. So, instead of KTablesource = builder.table("topic1"); I added: KStream source = builder.stream(longSerde,stringSerde,"topic1"); Thanks -Adrienne On Sun, Jun 19, 2016 at 4:11 PM, Matthias J. Sax wrote: > Can you show the full stack trace? > > How do you ingest the date into the topic? I also think, you should read > the topic as KStream (instead of KTable). > > What de-/serializer do you specify in props. (see > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes > ) > > > -Matthias > > On 06/19/2016 03:06 PM, Adrienne Kole wrote: > > Hi, > > > > I want to implement wordcount example with reduce function in KTable. > > However, I get the error: > > > > Exception in thread "StreamThread-1" > > org.apache.kafka.common.errors.SerializationException: Size of data > > received by LongDeserializer is not 8 > > > > > > Here is my code: > > > > > > KTable source = builder.table("topic1"); // here we > > have WordID and Word itself > > > > KTable counts = source.reduce(new Reducer() { > > > > @Override > > public Long apply(Long value1, Long value2) { > > // TODO Auto-generated method stub > > return value1+value2; > > } > > }, > > > > new Reducer() { > > > > @Override > > public Long apply(Long value1, Long value2) { > > // TODO Auto-generated method stub > > return value1-value2; > > } > > } > > > > , new KeyValueMapper >() { > > > > @Override > > public KeyValue apply(Long key, String value) { > > // TODO Auto-generated method stub > > return new KeyValue (value, new Long(1)); > > } > > }, stringSerde, longSerde, "count"); > > > > counts.to(Serdes.String(), Serdes.Long(), "topic2"); > > > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.start(); > > > > > > Moreover, I think the error messages should be more informative to better > > deal with such situations. > > > > > > > > - Adrienne > > > >
Re: Can I access Kafka Streams Key/Value store outside of Processor?
Hi Yi, Your observation about accessing the state stores that are already there vs. keeping state outside of Kafka Streams is a good one. We are currently working on having the state stores accessible like you mention and should be able to share some design docs shortly. Thanks Eno > On 19 Jun 2016, at 19:49, Yi Chenwrote: > > Hello, > > I am thinking of using the Kafka Steams feature to "unify" our real-time > and scheduled workflow. An example is that in our workflow with stages A--> > B --> C, the A --> B segment can be achieved in real-time, but B-->C > segment is usually a done with a scheduled job, running maybe once per hour > or once per 5 minutes, etc. > > I am hoping to model this using Kafka Streams. Each stage would be a topic: > the Kafka Streams will process real-time events in topic-A and send result > to topic-B. The challenge is when I process the events in topic-B, I want > to be able to process each event with a crontab-like schedule, so that if > the process is successful (by checking an external API) the event is send > to topic-C, otherwise, we will re-process the event again according to the > schedule. > > Can I use the RocksDB key/value state store to store the topic-B events > that failed to process, and have a scheduler (like quartz scheduler) to > iterate all events in the store and re-process again? I know I can always > keep the state outside of Kafka but I like that the state store is > fault-tolerant and can be rebuilt automatically if the instance fails. The > examples I found so far seems to imply that the state store is only > accessible from within a processor. > > Thanks, > Yi
Can I access Kafka Streams Key/Value store outside of Processor?
Hello, I am thinking of using the Kafka Steams feature to "unify" our real-time and scheduled workflow. An example is that in our workflow with stages A--> B --> C, the A --> B segment can be achieved in real-time, but B-->C segment is usually a done with a scheduled job, running maybe once per hour or once per 5 minutes, etc. I am hoping to model this using Kafka Streams. Each stage would be a topic: the Kafka Streams will process real-time events in topic-A and send result to topic-B. The challenge is when I process the events in topic-B, I want to be able to process each event with a crontab-like schedule, so that if the process is successful (by checking an external API) the event is send to topic-C, otherwise, we will re-process the event again according to the schedule. Can I use the RocksDB key/value state store to store the topic-B events that failed to process, and have a scheduler (like quartz scheduler) to iterate all events in the store and re-process again? I know I can always keep the state outside of Kafka but I like that the state store is fault-tolerant and can be rebuilt automatically if the instance fails. The examples I found so far seems to imply that the state store is only accessible from within a processor. Thanks, Yi
kafka + logstash
Hello all I use kafka input and kafka output plugin in logstash. I have high cpu usage, what can I do to get it better? Thanks a lot
kafka + logstash
Hello all I use kafka input and kafka output plugin in logstash. I have high cpu usage, what can I do to get it better? logstash version 2.3.2 logstash-input-kafka 2.0.8 logstash-output-kafka 2.0.5 Thanks a lot
Re: Error closing Socet for ...
Looks like the producers lose the connection to the brokers. Do the brokers have enough resources to handle all the producers? Does the network support that throughput? On Sun, 19 Jun 2016, 17:27 Avi Asulin,wrote: > Hi > We are using kafka 0.8.2 with scala 2.10 version > We currently have 3 brokers and we are working with ~ 170 producers > We frequently get the Error > > ERROR Closing socket for /170.144.181.50 because of error > (kafka.network.Processor) > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:197) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) > at kafka.utils.Utils$.read(Utils.scala:380) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Processor.read(SocketServer.scala:444) > at kafka.network.Processor.run(SocketServer.scala:340) > at java.lang.Thread.run(Thread.java:745) > > we get the error on many producers ips > Can somone explain what can cause this error and what can be done to get > rid of it? > > Thanks > Avi >
Error closing Socet for ...
Hi We are using kafka 0.8.2 with scala 2.10 version We currently have 3 brokers and we are working with ~ 170 producers We frequently get the Error ERROR Closing socket for /170.144.181.50 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) we get the error on many producers ips Can somone explain what can cause this error and what can be done to get rid of it? Thanks Avi
Re: Wordcount with reduce
Can you show the full stack trace? How do you ingest the date into the topic? I also think, you should read the topic as KStream (instead of KTable). What de-/serializer do you specify in props. (see http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes) -Matthias On 06/19/2016 03:06 PM, Adrienne Kole wrote: > Hi, > > I want to implement wordcount example with reduce function in KTable. > However, I get the error: > > Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Size of data > received by LongDeserializer is not 8 > > > Here is my code: > > > KTablesource = builder.table("topic1"); // here we > have WordID and Word itself > > KTable counts = source.reduce(new Reducer() { > > @Override > public Long apply(Long value1, Long value2) { > // TODO Auto-generated method stub > return value1+value2; > } > }, > > new Reducer() { > > @Override > public Long apply(Long value1, Long value2) { > // TODO Auto-generated method stub > return value1-value2; > } > } > > , new KeyValueMapper >() { > > @Override > public KeyValue apply(Long key, String value) { > // TODO Auto-generated method stub > return new KeyValue (value, new Long(1)); > } > }, stringSerde, longSerde, "count"); > > counts.to(Serdes.String(), Serdes.Long(), "topic2"); > > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > > > Moreover, I think the error messages should be more informative to better > deal with such situations. > > > > - Adrienne > signature.asc Description: OpenPGP digital signature
Wordcount with reduce
Hi, I want to implement wordcount example with reduce function in KTable. However, I get the error: Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 Here is my code: KTablesource = builder.table("topic1"); // here we have WordID and Word itself KTable counts = source.reduce(new Reducer() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1+value2; } }, new Reducer() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1-value2; } } , new KeyValueMapper >() { @Override public KeyValue apply(Long key, String value) { // TODO Auto-generated method stub return new KeyValue (value, new Long(1)); } }, stringSerde, longSerde, "count"); counts.to(Serdes.String(), Serdes.Long(), "topic2"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Moreover, I think the error messages should be more informative to better deal with such situations. - Adrienne
Re: test of producer's delay and consumer's delay
@jun Rao about this question,can you give me some suggestion? > 在 2016年6月18日,上午11:26,Kafka写道: > > hello,I have done a series of tests on kafka 0.9.0,and one of the results > confused me. > > test enviroment: > kafka cluster: 3 brokers,8core cpu / 8g mem /1g netcard > client:4core cpu/4g mem > topic:6 partitions,2 replica > > total messages:1 > singal message size:1024byte > fetch.min.bytes:1 > fetch.wait.max.ms:100ms > > all send tests are under the enviroment of using scala sync interface, > > when I set ack to 0,the producer’s delay is 0.3ms,the consumer’s delay is > 7.7ms > when I set ack to 1,the producer's delay is 1.6ms, the consumer’s delay is > 3.7ms > when I set ack to -1,the produce's delay is 3.5ms, the consumer’s delay is > 4.2ms > > but why consumer’s delay is decreased when I set ack from 0 to 1,its confused > me。 >
Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api
In my tests , I am using around 24 consumer groups. I never call consumer.close() or consumer.unsubscribe() until the application is shutting down. So the consumers never leave but new consumer instances do get created as the parallel requests pile up . Also, I am reusing consumer instances if they are idle ( i,.e not serving any consume request). So with 9 partitions , I do 9 parallel consume requests in parallel every second under the same consumer group. So to summarize I have the following test setup : 3 Kafka brokers , 2 zookeeper nodes, 1 topic , 9 partitions , 24 consumer groups and 9 consume requests at a time. From: Dana PowersSent: 19 June 2016 10:45 To: users@kafka.apache.org Subject: Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api Is your test reusing a group name? And if so, are your consumer instances gracefully leaving? This may cause subsequent 'rebalance' operations to block until those old consumers check-in or the session timeout happens (30secs) -Dana On Jun 18, 2016 8:56 PM, "Rohit Sardesai" wrote: > I am using the group management feature of Kafka 0.9 to handle partition > assignment to consumer instances. I use the subscribe() API to subscribe to > the topic I am interested in reading data from. I have an environment > where I have 3 Kafka brokers with a couple of Zookeeper nodes . I created > a topic with 9 partitions . The performance tests attempt to send 9 > parallel poll() requests to the Kafka brokers every second. The results > show that each poll() operation takes around 30 seconds for the first time > it polls and returns 0 records. Also , when I print the partition > assignment to this consumer instance , I see no partitions assigned to it. > The next poll() does return quickly ( ~ 10-20 ms) with data and some > partitions assigned to it. > > With each consumer taking 30 seconds , the performance tests report very > low throughput since I run the tests for around 1000 seconds out which I > produce messages on the topic for the complete duration and I start the > parallel consume requests after 400 seconds. So out of 400 seconds , with 9 > consumers taking 30 seconds each , around 270 seconds are spent in the > first poll without any data. Is this because of the re-balance operation > that the consumers are blocked on the poll() ? What is the best way to use > poll() if I have to serve many parallel requests per second ? Should I > prefer manual assignment of partitions in this case instead of relying on > re-balance ? > > > Regards, > > Rohit Sardesai > >