Hi Jason,

Thanks for investigating. Indeed we do have probably more than the usual
number of partitions. Our use case is such that we have many partitions
(128 - 256) overall but very few messages per second on each partition.

I have created a JIRA at https://issues.apache.org/jira/browse/KAFKA-3159.

Let me know if I can provide more details or help in any other way.

Thanks again,
Rajiv

On Wed, Jan 27, 2016 at 9:27 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> Thanks for the detailed report. Can you go ahead and create a JIRA? I do
> see the exceptions locally, but not nearly at the rate that you're
> reporting. That might be a factor of the number of partitions, so I'll do
> some investigation.
>
> -Jason
>
> On Wed, Jan 27, 2016 at 8:40 AM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > Hi Guozhang,
> >
> > The Github link I pasted was from the 0.9.0 branch. The same line seems
> to
> > be throwing exceptions in my code built of the maven 0.9.0.0 package. Are
> > you saying that something else has changed higher up the call stack that
> > will probably not trigger so many exceptions ?
> >
> > Thanks,
> > Rajiv
> >
> > On Tue, Jan 26, 2016 at 10:44 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Rajiv,
> > >
> > > Could you try to build the new consumer from 0.9.0 branch and see if
> the
> > > issue can be re-produced?
> > >
> > > Guozhang
> > >
> > > On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >
> > > > The exception seems to be thrown here
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> > > >
> > > > Is this not expected to hit often?
> > > >
> > > > On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com>
> > > wrote:
> > > >
> > > > > Wanted to add that we are not using auto commit since we use custom
> > > > > partition assignments. In fact we never call
> consumer.commitAsync()
> > or
> > > > > consumer.commitSync() calls. My assumption is that since we store
> our
> > > own
> > > > > offsets these calls are not necessary. Hopefully this is not
> > > responsible
> > > > > for the poor performance.
> > > > >
> > > > > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com>
> > > > wrote:
> > > > >
> > > > >> We are using the new kafka consumer with the following config (as
> > > logged
> > > > >> by kafka)
> > > > >>
> > > > >> metric.reporters = []
> > > > >>
> > > > >>         metadata.max.age.ms = 300000
> > > > >>
> > > > >>         value.deserializer = class
> > > > >> org.apache.kafka.common.serialization.ByteArrayDeserializer
> > > > >>
> > > > >>         group.id = myGroup.id
> > > > >>
> > > > >>         partition.assignment.strategy = [org.apache.kafka.clients.
> > > > >> consumer.RangeAssignor]
> > > > >>
> > > > >>         reconnect.backoff.ms = 50
> > > > >>
> > > > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> > > > >>
> > > > >>         max.partition.fetch.bytes = 2097152
> > > > >>
> > > > >>         bootstrap.servers = [myBrokerList]
> > > > >>
> > > > >>         retry.backoff.ms = 100
> > > > >>
> > > > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > > >>
> > > > >>         sasl.kerberos.service.name = null
> > > > >>
> > > > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> > > > >>
> > > > >>         ssl.keystore.type = JKS
> > > > >>
> > > > >>         ssl.trustmanager.algorithm = PKIX
> > > > >>
> > > > >>         enable.auto.commit = false
> > > > >>
> > > > >>         ssl.key.password = null
> > > > >>
> > > > >>         fetch.max.wait.ms = 1000
> > > > >>
> > > > >>         sasl.kerberos.min.time.before.relogin = 60000
> > > > >>
> > > > >>         connections.max.idle.ms = 540000
> > > > >>
> > > > >>         ssl.truststore.password = null
> > > > >>
> > > > >>         session.timeout.ms = 30000
> > > > >>
> > > > >>         metrics.num.samples = 2
> > > > >>
> > > > >>         client.id =
> > > > >>
> > > > >>         ssl.endpoint.identification.algorithm = null
> > > > >>
> > > > >>         key.deserializer = class sf.kafka.VoidDeserializer
> > > > >>
> > > > >>         ssl.protocol = TLS
> > > > >>
> > > > >>         check.crcs = true
> > > > >>
> > > > >>         request.timeout.ms = 40000
> > > > >>
> > > > >>         ssl.provider = null
> > > > >>
> > > > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > > >>
> > > > >>         ssl.keystore.location = null
> > > > >>
> > > > >>         heartbeat.interval.ms = 3000
> > > > >>
> > > > >>         auto.commit.interval.ms = 5000
> > > > >>
> > > > >>         receive.buffer.bytes = 32768
> > > > >>
> > > > >>         ssl.cipher.suites = null
> > > > >>
> > > > >>         ssl.truststore.type = JKS
> > > > >>
> > > > >>         security.protocol = PLAINTEXT
> > > > >>
> > > > >>         ssl.truststore.location = null
> > > > >>
> > > > >>         ssl.keystore.password = null
> > > > >>
> > > > >>         ssl.keymanager.algorithm = SunX509
> > > > >>
> > > > >>         metrics.sample.window.ms = 30000
> > > > >>
> > > > >>         fetch.min.bytes = 512
> > > > >>
> > > > >>         send.buffer.bytes = 131072
> > > > >>
> > > > >>         auto.offset.reset = earliest
> > > > >>
> > > > >>
> > > > >> We use the consumer.assign() feature to assign a list of
> partitions
> > > and
> > > > >> call poll in a loop.  We have the following setup:
> > > > >>
> > > > >> 1. The messages have no key and we use the byte array deserializer
> > to
> > > > get
> > > > >> byte arrays from the config.
> > > > >>
> > > > >> 2. The messages themselves are on an average about 75 bytes. We
> get
> > > this
> > > > >> number by diving the Kafka broker bytes-in metric by the
> messages-in
> > > > metric.
> > > > >>
> > > > >> 3. Each consumer is assigned about 64 partitions of the same topic
> > > > spread
> > > > >> across three brokers.
> > > > >>
> > > > >> 4. We get very few messages per second maybe around 1-2 messages
> > > across
> > > > >> all partitions on a client right now.
> > > > >>
> > > > >> 5. We have no compression on the topic.
> > > > >>
> > > > >> Our run loop looks something like this
> > > > >>
> > > > >> while (isRunning()) {
> > > > >>
> > > > >> ConsumerRecords<Void, byte[]> records = null;
> > > > >>
> > > > >>         try {
> > > > >>
> > > > >>             // Here timeout is about 10 seconds, so it is pretty
> > big.
> > > > >>
> > > > >>             records = consumer.poll(timeout);
> > > > >>
> > > > >>         } catch (Exception e) {
> > > > >>
> > > > >>             logger.error("Exception polling Kafka ", e);
> > > > >>
> > > > >>             records = null;
> > > > >>
> > > > >>         }
> > > > >>
> > > > >>         if (records != null) {
> > > > >>
> > > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > > > >>
> > > > >>                // The handler puts the byte array on a very fast
> > ring
> > > > >> buffer so it barely takes any time.
> > > > >>
> > > > >>
> > >  handler.handleMessage(ByteBuffer.wrap(record.value()));
> > > > >>
> > > > >>             }
> > > > >>
> > > > >>         }
> > > > >>
> > > > >> }
> > > > >>
> > > > >>
> > > > >>
> > > > >> With this setup our performance has taken a horrendous hit as soon
> > as
> > > we
> > > > >> started this one thread that just polls kafka in a loop.
> > > > >>
> > > > >> I profiled the application using Java Mission Control and have a
> few
> > > > >> insights.
> > > > >>
> > > > >> 1. There doesn't seem to be a single hotspot. The consumer just
> ends
> > > up
> > > > >> using a lot of CPU for handing such a low number of messages. Our
> > > > process
> > > > >> was using 16% CPU before we added a single consumer and it went to
> > 25%
> > > > and
> > > > >> above after. That's an increase of over 50% from a single consumer
> > > > getting
> > > > >> a single digit number of small messages per second. Here is an
> > > > attachment
> > > > >> of the cpu usage breakdown in the consumer (the namespace is
> > different
> > > > >> because we shade the kafka jar before using it) -
> > > > >> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds
> > > odd)
> > > > >> and that doesn't seem to make much of a difference either.
> > > > >>
> > > > >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am
> not
> > > sure
> > > > >> whether this is expected but this seems like it would completely
> > kill
> > > > >> performance. Here is the exception tab of Java mission control.
> > > > >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period
> > of 3
> > > > >> minutes which is about 10 thousand exceptions per second! The
> > > exception
> > > > >> stack trace shows that it originates from the poll call. I don't
> > > > understand
> > > > >> how it can throw so many exceptions given I call poll it with a
> > > timeout
> > > > of
> > > > >> 10 seconds and get messages at about 1 per second.
> > > > >>
> > > > >> 3. The single thread seems to allocate a lot too. The single
> thread
> > is
> > > > >> responsible for 17.87% of our entire JVM allocation rate. Most of
> > what
> > > > it
> > > > >> allocates seems to be those same EOFExceptions. Here is a chart
> > > showing
> > > > the
> > > > >> single thread's allocation proportion: http://imgur.com/GNUJQsz
> > Here
> > > is
> > > > >> a chart that shows a breakdown of the allocations:
> > > > >> http://imgur.com/YjCXljE About 20% of the allocations are for the
> > > > >> EOFExceptions. This seems kind of crazy especially given that this
> > > > happens
> > > > >> about 10 thousand times a second. The rest of the allocations seem
> > to
> > > be
> > > > >> spread all over but again seem excessive given how we are getting
> > very
> > > > few
> > > > >> messages.
> > > > >>
> > > > >> As a comparison, we also run a wrapper over the old SimpleConsumer
> > > that
> > > > >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a
> > > > different
> > > > >> topic) and it is able to handle that load without much trouble. At
> > > this
> > > > >> moment we are completely puzzled by this performance. Most of it
> > does
> > > > seem
> > > > >> to be due to the crazy volumes of exceptions. Note: Our messages
> > seem
> > > to
> > > > >> all be making through. The exceptions are caught by Kafka's stack
> > and
> > > > never
> > > > >> bubble though to us.
> > > > >>
> > > > >> Are we doing anything wrong with how we are using the new consumer
> > > > >> (longer timeouts of a 100 second odd don't seem to help)?
> > > > >>
> > > > >> Thanks in advance,
> > > > >>
> > > > >> Rajiv
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to