Nandish Kotadia created KAFKA-5868:
--------------------------------------
Summary: Kafka Consumer Rebalancing takes too long
Key: KAFKA-5868
URL: https://issues.apache.org/jira/browse/KAFKA-5868
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.11.0.0
Reporter: Nandish Kotadia
up vote
0
down vote
favorite
1
I have a Kafka Streams Application which takes data from few topics and joins
the data and puts it in another topic.
Kafka Configuration:
5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor.
Few millions of records are consumed/produced every hour. Whenever I take any
kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or
sometimes even more for rebalancing.
Anyone has any idea how to solve rebalancing issue in kafka consumer? Also,
many times it throws exception while rebalancing.
This is stopping us from going live in production environment with this setup.
Any help would be appreciated.
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned
the partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout or by
reducing the maximum size of batches returned in poll() with max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at
org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at
org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)
Kafka Streams Config:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000
ConsumerConfig it internally creates is:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092,
kafka-4:9092, kafka-5:9092]
check.crcs = true
client.id = conversion-live-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)