[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16435653#comment-16435653
 ] 

Ricardo Bartolome commented on KAFKA-6762:
------------------------------------------

Hello [~yuzhih...@gmail.com]. We enabled the DEBUG on the log-cleaner on a PRE 
cluster to see impact and today we enabled it in PRO too.

Nevertheless, we upgraded to Kafka 1.1.0 this week performing a rolling upgrade 
(terminating EC2 instances and spinning new ones that replicate data from 
others) and so far no broker has been affected by this. We are pretty lost on 
how to reproduce this, and so far we have been unable to reproduce it.

If it happens again, I'd expect to see something in the log-cleaner.log.

Our change has been as follows:
{code}
diff -u /etc/kafka/log4j.properties /tmp/log4j.properties
--- /etc/kafka/log4j.properties 2018-02-13 14:10:09.000000000 +0000
+++ /tmp/log4j.properties       2018-04-07 15:39:19.371814641 +0000
@@ -88,7 +88,7 @@
 log4j.logger.kafka.controller=INFO, controllerAppender
 log4j.additivity.kafka.controller=false

-log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.logger.kafka.log.LogCleaner=DEBUG, cleanerAppender
 log4j.additivity.kafka.log.LogCleaner=false

 log4j.logger.state.change.logger=INFO, stateChangeAppender
{code}



> log-cleaner thread terminates due to java.lang.IllegalStateException
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6762
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6762
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.0.0
>         Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>            Reporter: Ricardo Bartolome
>            Priority: Major
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 1000012.
>         at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
>         at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
>         at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
>         at scala.collection.immutable.List.foreach(List.scala:389)
>         at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:372)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO Compaction for partition 
> __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,774] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> {code}
> What we know so far is:
>  * We are unable to reproduce it yet in a consistent manner.
>  * It only happens in the PRO cluster and not in the PRE cluster for the same 
> customer (which message payloads are very similar)
>  * Checking our Kafka logs, it only happened on the internal topics 
> *__consumer_offsets-**
>  * When we restart the broker process the log-cleaner starts working again 
> but it can take between 3 minutes and some hours to die again.
>  * We workaround it by temporary increasing the message.max.bytes and 
> replica.fetch.max.bytes values to 10485760 (10MB) from default 1000012 (~1MB).
> ** Before message.max.bytes = 10MB, we tried to match message.max.size with 
> the value of replica.fetch.max.size (1048576), but log-cleaned died with the 
> same error but different limit.
>  ** This allowed the log-cleaner not to die and compact enough data as for 
> disk space to go from ~600GB to ~100GB.
>  ** Without this limit change, the log-cleaner dies after a while and the 
> used disk space stay at ~450GB and starts growing again due to cluster 
> activity.
> Our server.properties content is as follows, as printed ins server.log at 
> broker startup.
> {code:java}
> broker.id=11
> delete.topic.enable=true
> advertised.listeners=PLAINTEXT://broker-ip:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/var/lib/kafka
> num.partitions=12
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=2
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> zookeeper.connect=x.x.x.x:2181/kafka/cluster01
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=3000
> auto.create.topics.enable=false
> inter.broker.protocol.version=0.11.0
> log.message.format.version=0.11.0
> broker.rack=eu-west-1b
> default.replication.factor=3
> offsets.retention.minutes=10080
> {code}
> Our kafka configuration values, as printed in server.log when starting is the 
> following.
> {code:java}
> [2018-03-28 10:40:29,652] INFO KafkaConfig values:
>         advertised.host.name = null
>         advertised.listeners = PLAINTEXT://broker_ip:9092
>         advertised.port = null
>         alter.config.policy.class.name = null
>         authorizer.class.name =
>         auto.create.topics.enable = false
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         broker.id = 11
>         broker.id.generation.enable = true
>         broker.rack = eu-west-1b
>         compression.type = producer
>         connections.max.idle.ms = 600000
>         controlled.shutdown.enable = true
>         controlled.shutdown.max.retries = 3
>         controlled.shutdown.retry.backoff.ms = 5000
>         controller.socket.timeout.ms = 30000
>         create.topic.policy.class.name = null
>         default.replication.factor = 3
>         delete.records.purgatory.purge.interval.requests = 1
>         delete.topic.enable = true
>         fetch.purgatory.purge.interval.requests = 1000
>         group.initial.rebalance.delay.ms = 3000
>         group.max.session.timeout.ms = 300000
>         group.min.session.timeout.ms = 6000
>         host.name =
>         inter.broker.listener.name = null
>         inter.broker.protocol.version = 0.11.0
>         leader.imbalance.check.interval.seconds = 300
>         leader.imbalance.per.broker.percentage = 10
>         listener.security.protocol.map = 
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
>         listeners = null
>         log.cleaner.backoff.ms = 15000
>         log.cleaner.dedupe.buffer.size = 134217728
>         log.cleaner.delete.retention.ms = 86400000
>         log.cleaner.enable = true
>         log.cleaner.io.buffer.load.factor = 0.9
>         log.cleaner.io.buffer.size = 524288
>         log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
>         log.cleaner.min.cleanable.ratio = 0.5
>         log.cleaner.min.compaction.lag.ms = 0
>         log.cleaner.threads = 1
>         log.cleanup.policy = [delete]
>         log.dir = /tmp/kafka-logs
>         log.dirs = /var/lib/kafka
>         log.flush.interval.messages = 9223372036854775807
>         log.flush.interval.ms = null
>         log.flush.offset.checkpoint.interval.ms = 60000
>         log.flush.scheduler.interval.ms = 9223372036854775807
>         log.flush.start.offset.checkpoint.interval.ms = 60000
>         log.index.interval.bytes = 4096
>         log.index.size.max.bytes = 10485760
>         log.message.format.version = 0.11.0
>         log.message.timestamp.difference.max.ms = 9223372036854775807
>         log.message.timestamp.type = CreateTime
>         log.preallocate = false
>         log.retention.bytes = -1
>         log.retention.check.interval.ms = 300000
>         log.retention.hours = 168
>         log.retention.minutes = null
>         log.retention.ms = null
>         log.roll.hours = 168
>         log.roll.jitter.hours = 0
>         log.roll.jitter.ms = null
>         log.roll.ms = null
>         log.segment.bytes = 1073741824
>         log.segment.delete.delay.ms = 60000
>         max.connections.per.ip = 2147483647
>         max.connections.per.ip.overrides =
>         message.max.bytes = 1000012
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         min.insync.replicas = 1
>         num.io.threads = 8
>         num.network.threads = 3
>         num.partitions = 12
>         num.recovery.threads.per.data.dir = 1
>         num.replica.fetchers = 1
>         offset.metadata.max.bytes = 4096
>         offsets.commit.required.acks = -1
>         offsets.commit.timeout.ms = 5000
>         offsets.load.buffer.size = 5242880
>         offsets.retention.check.interval.ms = 600000
>         offsets.retention.minutes = 10080
>         offsets.topic.compression.codec = 0
>         offsets.topic.num.partitions = 50
>         offsets.topic.replication.factor = 3
>         offsets.topic.segment.bytes = 104857600
>         port = 9092
>         principal.builder.class = null
>         producer.purgatory.purge.interval.requests = 1000
>         queued.max.request.bytes = -1
>         queued.max.requests = 500
>         quota.consumer.default = 9223372036854775807
>         quota.producer.default = 9223372036854775807
>         quota.window.num = 11
>         quota.window.size.seconds = 1
>         replica.fetch.backoff.ms = 1000
>         replica.fetch.max.bytes = 1048576
>         replica.fetch.min.bytes = 1
>         replica.fetch.response.max.bytes = 10485760
>         replica.fetch.wait.max.ms = 500
>         replica.high.watermark.checkpoint.interval.ms = 5000
>         replica.lag.time.max.ms = 10000
>         replica.socket.receive.buffer.bytes = 65536
>         replica.socket.timeout.ms = 30000
>         replication.quota.window.num = 11
>         replication.quota.window.size.seconds = 1
>         request.timeout.ms = 30000
>         reserved.broker.max.id = 1000
>         sasl.enabled.mechanisms = [GSSAPI]
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.principal.to.local.rules = [DEFAULT]
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism.inter.broker.protocol = GSSAPI
>         security.inter.broker.protocol = PLAINTEXT
>         socket.receive.buffer.bytes = 102400
>         socket.request.max.bytes = 104857600
>         socket.send.buffer.bytes = 102400
>         ssl.cipher.suites = null
>         ssl.client.auth = none
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
>         transaction.max.timeout.ms = 900000
>         transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
>         transaction.state.log.load.buffer.size = 5242880
>         transaction.state.log.min.isr = 2
>         transaction.state.log.num.partitions = 50
>         transaction.state.log.replication.factor = 3
>         transaction.state.log.segment.bytes = 104857600
>         transactional.id.expiration.ms = 604800000
>         unclean.leader.election.enable = false
>         zookeeper.connect = (zookeeper connection string here)
>         zookeeper.connection.timeout.ms = 6000
>         zookeeper.session.timeout.ms = 6000
>         zookeeper.set.acl = false
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2018-03-28 10:40:29,745] INFO starting (kafka.server.KafkaServer)
> {code}
> The clients (sprint-kafka 2.1.4 that provides kafka-clients 1.0.x) have the 
> following configuration which is:
> * max.partition.fetch.bytes = 1048576
> While looks like increasing the message size might do the trick, we are 
> concerned that this happens again as soon as a customers start publishing 
> messages closer to the message.max.bytes limit. We decided to open this bug 
> report because we think something might be wrong if message.max.bytes is 1MB 
> and the log-cleaner throws an exception while compacting messages on 
> __consumer_offsets because it find messages that are bigger than that. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to