Hi Ryanne,

Thanks for your response. I had even tried with 5 records and session
timeout as big as 10 minutes. Logs still showed that consumer group
rebalanced many times.
Also there is another mystery, some CGs take upto 10 minutes to subscribe
to topic and start consumption. Why might that be happening, any idea?

On Tue, Aug 28, 2018 at 8:44 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:

> Shantanu,
>
> Sounds like your consumers are processing too many records between poll()s.
> Notice that max.poll.records is 50. If your consumer is taking up to 200ms
> to process each record, then you'd see up to 10 seconds between poll()s.
>
> If a consumer doesn't call poll() frequently enough, Kafka will consider
> the consumer to be dead and will rebalance away from it. Since all your
> consumers are in this state, your consumer group is constantly rebalancing.
>
> Fix is easy: reduce max.poll.records.
>
> Ryanne
>
> On Tue, Aug 28, 2018 at 6:34 AM Shantanu Deshmukh <shantanu...@gmail.com>
> wrote:
>
> > Someone, please help me. Only 1 or 2 out of 7 consumer groups keep
> > rebalancing every 5-10mins. One topic is constantly receiving 10-20
> > msg/sec. The other one receives a bulk load after many hours of
> inactivity.
> > CGs for both these topics are different. So, I see no observable pattern
> > here.
> >
> > On Wed, Aug 22, 2018 at 5:47 PM Shantanu Deshmukh <shantanu...@gmail.com
> >
> > wrote:
> >
> > > I know average time of processing one record, it is about 70-80ms. I
> have
> > > set session.timeout.ms so high total processing time for one poll
> > > invocation should be well within it.
> > >
> > > On Wed, Aug 22, 2018 at 5:04 PM Steve Tian <steve.cs.t...@gmail.com>
> > > wrote:
> > >
> > >> Have you measured the duration between two `poll` invocations and the
> > size
> > >> of returned `ConsumrRecords`?
> > >>
> > >> On Wed, Aug 22, 2018, 7:00 PM Shantanu Deshmukh <
> shantanu...@gmail.com>
> > >> wrote:
> > >>
> > >> > Ohh sorry, my bad. Kafka version is 0.10.1.0 indeed and so is the
> > >> client.
> > >> >
> > >> > On Wed, Aug 22, 2018 at 4:26 PM Steve Tian <steve.cs.t...@gmail.com
> >
> > >> > wrote:
> > >> >
> > >> > > NVM.  What's your client version?  I'm asking as
> > max.poll.interval.ms
> > >> > > should be introduced since 0.10.1.0, which is not the version you
> > >> > mentioned
> > >> > > in the email thread.
> > >> > >
> > >> > > On Wed, Aug 22, 2018, 6:51 PM Shantanu Deshmukh <
> > >> shantanu...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > How do I check for GC pausing?
> > >> > > >
> > >> > > > On Wed, Aug 22, 2018 at 4:12 PM Steve Tian <
> > steve.cs.t...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Did you observed any GC-pausing?
> > >> > > > >
> > >> > > > > On Wed, Aug 22, 2018, 6:38 PM Shantanu Deshmukh <
> > >> > shantanu...@gmail.com
> > >> > > >
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi Steve,
> > >> > > > > >
> > >> > > > > > Application is just sending mails. Every record is just a
> > email
> > >> > > request
> > >> > > > > > with very basic business logic. Generally it doesn't take
> more
> > >> than
> > >> > > > 200ms
> > >> > > > > > to process a single mail. Currently it is averaging out at
> > 70-80
> > >> > ms.
> > >> > > > > >
> > >> > > > > > On Wed, Aug 22, 2018 at 3:06 PM Steve Tian <
> > >> > steve.cs.t...@gmail.com>
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > > How long did it take to process 50 `ConsumerRecord`s?
> > >> > > > > > >
> > >> > > > > > > On Wed, Aug 22, 2018, 5:16 PM Shantanu Deshmukh <
> > >> > > > shantanu...@gmail.com
> > >> > > > > >
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hello,
> > >> > > > > > > >
> > >> > > > > > > > We have Kafka 0.10.0.1 running on a 3 broker cluster. We
> > >> have
> > >> > an
> > >> > > > > > > > application which consumes from a topic having 10
> > >> partitions.
> > >> > 10
> > >> > > > > > > consumers
> > >> > > > > > > > are spawned from this process, they belong to one
> consumer
> > >> > group.
> > >> > > > > > > >
> > >> > > > > > > > What we have observed is that very frequently we are
> > >> observing
> > >> > > such
> > >> > > > > > > > messages in consumer logs
> > >> > > > > > > >
> > >> > > > > > > > [2018-08-21 11:12:46] :: WARN  ::
> ConsumerCoordinator:554
> > -
> > >> > Auto
> > >> > > > > offset
> > >> > > > > > > > commit failed for group otp-email-consumer: Commit
> cannot
> > be
> > >> > > > > completed
> > >> > > > > > > > since the group has already rebalanced and assigned the
> > >> > > partitions
> > >> > > > to
> > >> > > > > > > > another member. This means that the time between
> > subsequent
> > >> > calls
> > >> > > > to
> > >> > > > > > > poll()
> > >> > > > > > > > was longer than the configured max.poll.interval.ms,
> > which
> > >> > > > typically
> > >> > > > > > > > implies that the poll loop is spending too much time
> > message
> > >> > > > > > processing.
> > >> > > > > > > > You can address this either by increasing the session
> > >> timeout
> > >> > or
> > >> > > by
> > >> > > > > > > > reducing the maximum size of batches returned in poll()
> > with
> > >> > > > > > > > max.poll.records.
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> ConsumerCoordinator:333
> > -
> > >> > > > Revoking
> > >> > > > > > > > previously assigned partitions [otp-email-1,
> otp-email-0,
> > >> > > > > otp-email-3,
> > >> > > > > > > > otp-email-2] for group otp-email-consumer
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> AbstractCoordinator:381
> > -
> > >> > > > > > (Re-)joining
> > >> > > > > > > > group otp-email-consumer
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> AbstractCoordinator:600
> > -
> > >> > > > *Marking
> > >> > > > > > the
> > >> > > > > > > > coordinator x.x.x.x:9092 (id: 2147483646
> > <(214)%20748-3646> rack: null) dead
> > >> for
> > >> > > group
> > >> > > > > > > > otp-email-consumer*
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> AbstractCoordinator:600
> > -
> > >> > > > *Marking
> > >> > > > > > the
> > >> > > > > > > > coordinator x.x.x.x:9092 (id: 2147483646
> > <(214)%20748-3646> rack: null) dead
> > >> for
> > >> > > group
> > >> > > > > > > > otp-email-consumer*
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> > >> > > > > > > > AbstractCoordinator$GroupCoordinatorResponseHandler:555
> -
> > >> > > > Discovered
> > >> > > > > > > > coordinator 10.189.179.117:9092 (id: 2147483646
> > <(214)%20748-3646> rack: null)
> > >> > for
> > >> > > > > group
> > >> > > > > > > > otp-email-consumer.
> > >> > > > > > > > [2018-08-21 11:12:46] :: INFO  ::
> AbstractCoordinator:381
> > -
> > >> > > > > > (Re-)joining
> > >> > > > > > > > group otp-email-consumer
> > >> > > > > > > >
> > >> > > > > > > > After this, the group enters rebalancing phase and it
> > takes
> > >> > about
> > >> > > > > 5-10
> > >> > > > > > > > minutes to start consuming messages again.
> > >> > > > > > > > What does this message mean? The actual broker doesn't
> go
> > >> down
> > >> > > as
> > >> > > > > per
> > >> > > > > > > our
> > >> > > > > > > > monitoring tools. So how come it is declared dead?
> Please
> > >> > help, I
> > >> > > > am
> > >> > > > > > > stuck
> > >> > > > > > > > on this issue since 2 months now.
> > >> > > > > > > >
> > >> > > > > > > > Here's our consumer configuration
> > >> > > > > > > > auto.commit.interval.ms = 3000
> > >> > > > > > > > auto.offset.reset = latest
> > >> > > > > > > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092,
> > >> x.x.x.x:9092]
> > >> > > > > > > > check.crcs = true
> > >> > > > > > > > client.id =
> > >> > > > > > > > connections.max.idle.ms = 540000
> > >> > > > > > > > enable.auto.commit = true
> > >> > > > > > > > exclude.internal.topics = true
> > >> > > > > > > > fetch.max.bytes = 52428800
> > >> > > > > > > > fetch.max.wait.ms = 500
> > >> > > > > > > > fetch.min.bytes = 1
> > >> > > > > > > > group.id = otp-notifications-consumer
> > >> > > > > > > > heartbeat.interval.ms = 3000
> > >> > > > > > > > interceptor.classes = null
> > >> > > > > > > > key.deserializer = class
> > >> org.apache.kafka.common.serialization.
> > >> > > > > > > > StringDeserializer
> > >> > > > > > > > max.partition.fetch.bytes = 1048576
> > >> > > > > > > > max.poll.interval.ms = 300000
> > >> > > > > > > > max.poll.records = 50
> > >> > > > > > > > metadata.max.age.ms = 300000
> > >> > > > > > > > metric.reporters = []
> > >> > > > > > > > metrics.num.samples = 2
> > >> > > > > > > > metrics.sample.window.ms = 30000
> > >> > > > > > > > partition.assignment.strategy = [class
> > >> > org.apache.kafka.clients.
> > >> > > > > > > > consumer.RangeAssignor]
> > >> > > > > > > > receive.buffer.bytes = 65536
> > >> > > > > > > > reconnect.backoff.ms = 50
> > >> > > > > > > > request.timeout.ms = 305000
> > >> > > > > > > > retry.backoff.ms = 100
> > >> > > > > > > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > >> > > > > > > > sasl.kerberos.min.time.before.relogin = 60000
> > >> > > > > > > > sasl.kerberos.service.name = null
> > >> > > > > > > > sasl.kerberos.ticket.renew.jitter = 0.05
> > >> > > > > > > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > >> > > > > > > > sasl.mechanism = GSSAPI
> > >> > > > > > > > security.protocol = SSL
> > >> > > > > > > > send.buffer.bytes = 131072
> > >> > > > > > > > session.timeout.ms = 300000
> > >> > > > > > > > ssl.cipher.suites = null
> > >> > > > > > > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > >> > > > > > > > ssl.endpoint.identification.algorithm = null
> > >> > > > > > > > ssl.key.password = null
> > >> > > > > > > > ssl.keymanager.algorithm = SunX509
> > >> > > > > > > > ssl.keystore.location = null
> > >> > > > > > > > ssl.keystore.password = null
> > >> > > > > > > > ssl.keystore.type = JKS
> > >> > > > > > > > ssl.protocol = TLS
> > >> > > > > > > > ssl.provider = null
> > >> > > > > > > > ssl.secure.random.implementation = null
> > >> > > > > > > > ssl.trustmanager.algorithm = PKIX
> > >> > > > > > > > ssl.truststore.location = /x/x/client.truststore.jks
> > >> > > > > > > > ssl.truststore.password = [hidden]
> > >> > > > > > > > ssl.truststore.type = JKS
> > >> > > > > > > > value.deserializer = class
> > >> > org.apache.kafka.common.serialization.
> > >> > > > > > > > StringDeserializer
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to