[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488588#comment-16488588
 ] 

Uwe Eisele commented on KAFKA-6762:
-----------------------------------

After a restart of the broker, log-cleaner thread runs in the same problem 
again. During debugging of the log cleaner we saw that the affected log 
segement only contains deletable control batches in at least the first 1000012 
bytes. In this case the log cleaner states that zero messages have been read 
and finally throws the IllegalStateExeption. It seems that the log cleaner can 
not handle larger chunks of a segment that only contain deletable records.


{code:java}
092 class LogCleaner ... {
...
558 private[log] def cleanInto(...) {
595   while (position < sourceRecords.sizeInBytes) {
...
601      sourceRecords.readInto(readBuffer, position)
602      val records = MemoryRecords.readableRecords(readBuffer)
...
604      val result = records.filterTo(topicPartition, logCleanerFilter, 
writeBuffer, maxLogMessageSize, decompressionBufferSupplier) //in this case an 
empty result with zero read messages is returned, though the read buffer 
contains records (as mentioned above all of them are deletable control batches)
...
626      if (readBuffer.limit() > 0 && result.messagesRead == 0)
627        growBuffers(maxLogMessageSize) // throws the IllegalStateException 
if the readBuffer has reached the max messages size 
628   }
...
{code}

The reason that no records are returned in the filter method is the following 
code block in the MemoryRecords class:
{code:java}
141 private static FilterResult filterTo(...) {
...
154   for (MutableRecordBatch batch : batches) {
155     bytesRead += batch.sizeInBytes();
156
157     BatchRetention batchRetention = filter.checkBatchRetention(batch);
158     if (batchRetention == BatchRetention.DELETE)
159       continue;
{code}

The segment which caused this problem can be found here 
[^__consumer_offsets-9_00000000000000000000.tar.xz] .

> log-cleaner thread terminates due to java.lang.IllegalStateException
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6762
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6762
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.0.0
>         Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>            Reporter: Ricardo Bartolome
>            Priority: Major
>         Attachments: __consumer_offsets-9_00000000000000000000.tar.xz
>
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 1000012 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 1000012.
>         at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
>         at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
>         at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
>         at scala.collection.immutable.List.foreach(List.scala:389)
>         at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:372)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO Compaction for partition 
> __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,774] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> {code}
> What we know so far is:
>  * We are unable to reproduce it yet in a consistent manner.
>  * It only happens in the PRO cluster and not in the PRE cluster for the same 
> customer (which message payloads are very similar)
>  * Checking our Kafka logs, it only happened on the internal topics 
> *__consumer_offsets-**
>  * When we restart the broker process the log-cleaner starts working again 
> but it can take between 3 minutes and some hours to die again.
>  * We workaround it by temporary increasing the message.max.bytes and 
> replica.fetch.max.bytes values to 10485760 (10MB) from default 1000012 (~1MB).
> ** Before message.max.bytes = 10MB, we tried to match message.max.size with 
> the value of replica.fetch.max.size (1048576), but log-cleaned died with the 
> same error but different limit.
>  ** This allowed the log-cleaner not to die and compact enough data as for 
> disk space to go from ~600GB to ~100GB.
>  ** Without this limit change, the log-cleaner dies after a while and the 
> used disk space stay at ~450GB and starts growing again due to cluster 
> activity.
> Our server.properties content is as follows, as printed ins server.log at 
> broker startup.
> {code:java}
> broker.id=11
> delete.topic.enable=true
> advertised.listeners=PLAINTEXT://broker-ip:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/var/lib/kafka
> num.partitions=12
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=2
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> zookeeper.connect=x.x.x.x:2181/kafka/cluster01
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=3000
> auto.create.topics.enable=false
> inter.broker.protocol.version=0.11.0
> log.message.format.version=0.11.0
> broker.rack=eu-west-1b
> default.replication.factor=3
> offsets.retention.minutes=10080
> {code}
> Our kafka configuration values, as printed in server.log when starting is the 
> following.
> {code:java}
> [2018-03-28 10:40:29,652] INFO KafkaConfig values:
>         advertised.host.name = null
>         advertised.listeners = PLAINTEXT://broker_ip:9092
>         advertised.port = null
>         alter.config.policy.class.name = null
>         authorizer.class.name =
>         auto.create.topics.enable = false
>         auto.leader.rebalance.enable = true
>         background.threads = 10
>         broker.id = 11
>         broker.id.generation.enable = true
>         broker.rack = eu-west-1b
>         compression.type = producer
>         connections.max.idle.ms = 600000
>         controlled.shutdown.enable = true
>         controlled.shutdown.max.retries = 3
>         controlled.shutdown.retry.backoff.ms = 5000
>         controller.socket.timeout.ms = 30000
>         create.topic.policy.class.name = null
>         default.replication.factor = 3
>         delete.records.purgatory.purge.interval.requests = 1
>         delete.topic.enable = true
>         fetch.purgatory.purge.interval.requests = 1000
>         group.initial.rebalance.delay.ms = 3000
>         group.max.session.timeout.ms = 300000
>         group.min.session.timeout.ms = 6000
>         host.name =
>         inter.broker.listener.name = null
>         inter.broker.protocol.version = 0.11.0
>         leader.imbalance.check.interval.seconds = 300
>         leader.imbalance.per.broker.percentage = 10
>         listener.security.protocol.map = 
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
>         listeners = null
>         log.cleaner.backoff.ms = 15000
>         log.cleaner.dedupe.buffer.size = 134217728
>         log.cleaner.delete.retention.ms = 86400000
>         log.cleaner.enable = true
>         log.cleaner.io.buffer.load.factor = 0.9
>         log.cleaner.io.buffer.size = 524288
>         log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
>         log.cleaner.min.cleanable.ratio = 0.5
>         log.cleaner.min.compaction.lag.ms = 0
>         log.cleaner.threads = 1
>         log.cleanup.policy = [delete]
>         log.dir = /tmp/kafka-logs
>         log.dirs = /var/lib/kafka
>         log.flush.interval.messages = 9223372036854775807
>         log.flush.interval.ms = null
>         log.flush.offset.checkpoint.interval.ms = 60000
>         log.flush.scheduler.interval.ms = 9223372036854775807
>         log.flush.start.offset.checkpoint.interval.ms = 60000
>         log.index.interval.bytes = 4096
>         log.index.size.max.bytes = 10485760
>         log.message.format.version = 0.11.0
>         log.message.timestamp.difference.max.ms = 9223372036854775807
>         log.message.timestamp.type = CreateTime
>         log.preallocate = false
>         log.retention.bytes = -1
>         log.retention.check.interval.ms = 300000
>         log.retention.hours = 168
>         log.retention.minutes = null
>         log.retention.ms = null
>         log.roll.hours = 168
>         log.roll.jitter.hours = 0
>         log.roll.jitter.ms = null
>         log.roll.ms = null
>         log.segment.bytes = 1073741824
>         log.segment.delete.delay.ms = 60000
>         max.connections.per.ip = 2147483647
>         max.connections.per.ip.overrides =
>         message.max.bytes = 1000012
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         min.insync.replicas = 1
>         num.io.threads = 8
>         num.network.threads = 3
>         num.partitions = 12
>         num.recovery.threads.per.data.dir = 1
>         num.replica.fetchers = 1
>         offset.metadata.max.bytes = 4096
>         offsets.commit.required.acks = -1
>         offsets.commit.timeout.ms = 5000
>         offsets.load.buffer.size = 5242880
>         offsets.retention.check.interval.ms = 600000
>         offsets.retention.minutes = 10080
>         offsets.topic.compression.codec = 0
>         offsets.topic.num.partitions = 50
>         offsets.topic.replication.factor = 3
>         offsets.topic.segment.bytes = 104857600
>         port = 9092
>         principal.builder.class = null
>         producer.purgatory.purge.interval.requests = 1000
>         queued.max.request.bytes = -1
>         queued.max.requests = 500
>         quota.consumer.default = 9223372036854775807
>         quota.producer.default = 9223372036854775807
>         quota.window.num = 11
>         quota.window.size.seconds = 1
>         replica.fetch.backoff.ms = 1000
>         replica.fetch.max.bytes = 1048576
>         replica.fetch.min.bytes = 1
>         replica.fetch.response.max.bytes = 10485760
>         replica.fetch.wait.max.ms = 500
>         replica.high.watermark.checkpoint.interval.ms = 5000
>         replica.lag.time.max.ms = 10000
>         replica.socket.receive.buffer.bytes = 65536
>         replica.socket.timeout.ms = 30000
>         replication.quota.window.num = 11
>         replication.quota.window.size.seconds = 1
>         request.timeout.ms = 30000
>         reserved.broker.max.id = 1000
>         sasl.enabled.mechanisms = [GSSAPI]
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.principal.to.local.rules = [DEFAULT]
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism.inter.broker.protocol = GSSAPI
>         security.inter.broker.protocol = PLAINTEXT
>         socket.receive.buffer.bytes = 102400
>         socket.request.max.bytes = 104857600
>         socket.send.buffer.bytes = 102400
>         ssl.cipher.suites = null
>         ssl.client.auth = none
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
>         transaction.max.timeout.ms = 900000
>         transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
>         transaction.state.log.load.buffer.size = 5242880
>         transaction.state.log.min.isr = 2
>         transaction.state.log.num.partitions = 50
>         transaction.state.log.replication.factor = 3
>         transaction.state.log.segment.bytes = 104857600
>         transactional.id.expiration.ms = 604800000
>         unclean.leader.election.enable = false
>         zookeeper.connect = (zookeeper connection string here)
>         zookeeper.connection.timeout.ms = 6000
>         zookeeper.session.timeout.ms = 6000
>         zookeeper.set.acl = false
>         zookeeper.sync.time.ms = 2000
>  (kafka.server.KafkaConfig)
> [2018-03-28 10:40:29,745] INFO starting (kafka.server.KafkaServer)
> {code}
> The clients (sprint-kafka 2.1.4 that provides kafka-clients 1.0.x) have the 
> following configuration which is:
> * max.partition.fetch.bytes = 1048576
> While looks like increasing the message size might do the trick, we are 
> concerned that this happens again as soon as a customers start publishing 
> messages closer to the message.max.bytes limit. We decided to open this bug 
> report because we think something might be wrong if message.max.bytes is 1MB 
> and the log-cleaner throws an exception while compacting messages on 
> __consumer_offsets because it find messages that are bigger than that. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to