[
https://issues.apache.org/jira/browse/KAFKA-5868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219779#comment-16219779
]
Matthias J. Sax commented on KAFKA-5868:
----------------------------------------
This should be fixed in {{1.0.0}} via KAFKA-4593 -- {{1.0.0}} should be
released shortly. Can you please try it out and verify. Thx.
> Kafka Consumer Rebalancing takes too long
> -----------------------------------------
>
> Key: KAFKA-5868
> URL: https://issues.apache.org/jira/browse/KAFKA-5868
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
> Reporter: Nandish Kotadia
>
> up vote
> 0
> down vote
> favorite
> 1
> I have a Kafka Streams Application which takes data from few topics and joins
> the data and puts it in another topic.
> *Kafka Configuration: *
> * 5 kafka brokers
> * Kafka Topics - 15 partitions and 3 replication factor.
> Few millions of records are consumed/produced every hour. Whenever I take any
> kafka broker down, it goes into rebalancing and it takes approx. 30 minutes
> or sometimes even more for rebalancing. Also it kills many of my Kafka
> Streams processes.
> *Note: My Kafka Streams processes are running on the same machine as of Kafka
> Broker.*
> Anyone has any idea how to solve rebalancing issue in kafka consumer? Also,
> many times it throws exception while rebalancing.
> This is stopping us from going live in production environment with this
> setup. Any help would be appreciated.
> _Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
> Commit cannot be completed since the group has already rebalanced and
> assigned the partitions to another member. This means that the time between
> subsequent calls to poll() was longer than the configured
> max.poll.interval.ms, which typically implies that the poll loop is spending
> too much time message processing. You can address this either by increasing
> the session timeout or by reducing the maximum size of batches returned in
> poll() with max.poll.records.
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)_
> *Kafka Streams Config: *
> *
> bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
> * max.poll.records = 100
> * request.timeout.ms=40000
> ConsumerConfig it internally creates is:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092,
> kafka-4:9092, kafka-5:9092]
> check.crcs = true
> client.id = conversion-live-StreamThread-1-restore-consumer
> connections.max.idle.ms = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id =
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> internal.leave.group.on.close = false
> isolation.level = read_uncommitted
> key.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 2147483647
> max.poll.records = 100
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 40000
> retry.backoff.ms = 100
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)