Nandish Kotadia created KAFKA-5868:
--------------------------------------

             Summary: Kafka Consumer Rebalancing takes too long
                 Key: KAFKA-5868
                 URL: https://issues.apache.org/jira/browse/KAFKA-5868
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.0
            Reporter: Nandish Kotadia


up vote
0
down vote
favorite
1
I have a Kafka Streams Application which takes data from few topics and joins 
the data and puts it in another topic.

Kafka Configuration:

5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 
Few millions of records are consumed/produced every hour. Whenever I take any 
kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or 
sometimes even more for rebalancing.

Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, 
many times it throws exception while rebalancing.

This is stopping us from going live in production environment with this setup. 
Any help would be appreciated.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned 
the partitions to another member. This means that the time between subsequent 
calls to poll() was longer than the configured max.poll.interval.ms, which 
typically implies that the poll loop is spending too much time message 
processing. You can address this either by increasing the session timeout or by 
reducing the maximum size of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at 
org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)


Kafka Streams Config:

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
ConsumerConfig it internally creates is:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, 
kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to