[ https://issues.apache.org/jira/browse/KAFKA-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983866#comment-16983866 ]
Daniyar commented on KAFKA-9213: -------------------------------- UPDATE: I've investigated another occurrence of this exception. For analyzes, I used: 1) a memory dump that was taken from the broker around 6.37 pm 2) kafka log file 3) kafka state-change log 4) log, index and time-index files of a failed segment 5) Kafka source code, version 2.3.1 and 1.1.0 Here's how the exception looks like in the kafka log: ``` 2019/11/19 16:03:00 INFO [ProducerStateManager partition=ad_group_metrics-62] Writing producer snapshot at offset 13886052 (kafka.log.ProducerStateManager) 2019/11/19 16:03:00 INFO [Log partition=ad_group_metrics-62, dir=/mnt/kafka] Rolled new log segment at offset 13886052 in 1 ms. (kafka.log.Log) 2019/11/19 16:03:00 ERROR [ReplicaManager broker=17] Error processing append operation on partition ad_group_metrics-62 (kafka.server.ReplicaManager) 2019/11/19 16:03:00 java.nio.BufferOverflowException 2019/11/19 16:03:00 at java.nio.Buffer.nextPutIndex(Buffer.java:527) 2019/11/19 16:03:00 at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) 2019/11/19 16:03:00 at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) 2019/11/19 16:03:00 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 2019/11/19 16:03:00 at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) 2019/11/19 16:03:00 at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) 2019/11/19 16:03:00 at scala.Option.foreach(Option.scala:407) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$roll$2(Log.scala:1690) 2019/11/19 16:03:00 at kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at kafka.log.Log.roll(Log.scala:1654) 2019/11/19 16:03:00 at kafka.log.Log.maybeRoll(Log.scala:1639) 2019/11/19 16:03:00 at kafka.log.Log.$anonfun$append$2(Log.scala:966) 2019/11/19 16:03:00 at kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at kafka.log.Log.append(Log.scala:850) 2019/11/19 16:03:00 at kafka.log.Log.appendAsLeader(Log.scala:819) 2019/11/19 16:03:00 at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) 2019/11/19 16:03:00 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) 2019/11/19 16:03:00 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) 2019/11/19 16:03:00 at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) 2019/11/19 16:03:00 at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) 2019/11/19 16:03:00 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) 2019/11/19 16:03:00 at scala.collection.TraversableLike.map(TraversableLike.scala:238) 2019/11/19 16:03:00 at scala.collection.TraversableLike.map$(TraversableLike.scala:231) 2019/11/19 16:03:00 at scala.collection.AbstractTraversable.map(Traversable.scala:108) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751) 2019/11/19 16:03:00 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492) 2019/11/19 16:03:00 at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544) 2019/11/19 16:03:00 at kafka.server.KafkaApis.handle(KafkaApis.scala:113) 2019/11/19 16:03:00 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 2019/11/19 16:03:00 at java.lang.Thread.run(Thread.java:748) ... ``` What we see here, is that a new segment was rolled out at the offset 13886052 and then an exception happened while trying to make _some_ segment as inactive ({{`onBecomeInactiveSegment`}}) on appending new messages to the Log. The timing of the rolling out of a new segment and appending new messages doesn't play a role. There are many other similar exceptions where this occurs a few seconds after rolling out of a new segment. I managed to find the {{`LogSegment`}} object for the offset 13886052 in the memory dump. I followed the source code logic, checking the LogSegment state and Kafka logs, and found that the `TimeIndex` object somehow went into the state with 0 entries and 0 max possible entries (and an empty memory map). Having 0 entries is normal for TimeIndex and OffsetIndex even if there are some records in the Log unless their size passes some threshold. But having 0 max possible entries along with 0 entries made the TimeIndex considered as full (0 entries == 0 max entries) and was triggering the rolling out a new segment. The Log was trying to add a final timestamp to the TimeIndex but was failing due to an empty memory map (BufferOverflowException). Neither the OffsetIndex nor Log messages were in a similar state. According to the source code, the only way to get into a state when TimeIndex has 0 entries and 0 max possible entries is to call `{{resize(0)`}} or `{{resize(entrySize * _entries)`}} methods. Unfortunately, I didn't find the ways to have them called that could lead to such a state. These calls could be done either for both OffsetIndex and TimeIndex at the same time (but we don't observe a similar state for the OffsetIndex in the memory dump) or during the loading of the segment on Kafka server startup (which is not possible for the newly rolled out segment). The {{`state-change.log`}} was not useful since it didn't contain any state-changing events around that time. So, we can say that Kafka cluster and that particular broker considered themselves as stable and healthy. So, for summarizing: 1) Some, a maybe regular scheduled, event coincidentally happens after rolling out of a new segment. 2) That event resizes TimeIndex to 0 sizes with 0 max possible entries and reinitializes a memory map making it 0 sizes. 3) After that, TimeIndex gets considered as full but with no possibility to add a final timestamp to the empty memory map. > BufferOverflowException on rolling new segment after upgrading Kafka from > 1.1.0 to 2.3.1 > ---------------------------------------------------------------------------------------- > > Key: KAFKA-9213 > URL: https://issues.apache.org/jira/browse/KAFKA-9213 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 2.3.1 > Environment: Ubuntu 16.04, AWS instance d2.8xlarge. > JAVA Options: > -Xms16G > -Xmx16G > -XX:G1HeapRegionSize=16M > -XX:MetaspaceSize=96m > -XX:MinMetaspaceFreeRatio=50 > Reporter: Daniyar > Priority: Blocker > > We updated our Kafka cluster from 1.1.0 version to 2.3.1. We followed up to > step 2 of the [update > instruction|[https://kafka.apache.org/documentation/#upgrade]]. > Message format and inter-broker protocol versions were left the same: > inter.broker.protocol.version=1.1 > log.message.format.version=1.1 > > After upgrading, we started to get some occasional exceptions: > {code:java} > 2019/11/19 05:30:53 INFO [ProducerStateManager > partition=matchmaker_retry_clicks_15m-2] Writing producer snapshot at > offset 788532 (kafka.log.ProducerStateManager) > 2019/11/19 05:30:53 INFO [Log partition=matchmaker_retry_clicks_15m-2, > dir=/mnt/kafka] Rolled new log segment at offset 788532 in 1 ms. > (kafka.log.Log) > 2019/11/19 05:31:01 ERROR [ReplicaManager broker=0] Error processing append > operation on partition matchmaker_retry_clicks_15m-2 > (kafka.server.ReplicaManager) > 2019/11/19 05:31:01 java.nio.BufferOverflowException > 2019/11/19 05:31:01 at java.nio.Buffer.nextPutIndex(Buffer.java:527) > 2019/11/19 05:31:01 at > java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) > 2019/11/19 05:31:01 at > kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) > 2019/11/19 05:31:01 at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > 2019/11/19 05:31:01 at > kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) > 2019/11/19 05:31:01 at > kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690) > 2019/11/19 05:31:01 at > kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) > 2019/11/19 05:31:01 at scala.Option.foreach(Option.scala:407) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$2(Log.scala:1690) > 2019/11/19 05:31:01 at > kafka.log.Log.maybeHandleIOException(Log.scala:2085) > 2019/11/19 05:31:01 at kafka.log.Log.roll(Log.scala:1654) > 2019/11/19 05:31:01 at kafka.log.Log.maybeRoll(Log.scala:1639) > 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$append$2(Log.scala:966) > 2019/11/19 05:31:01 at > kafka.log.Log.maybeHandleIOException(Log.scala:2085) > 2019/11/19 05:31:01 at kafka.log.Log.append(Log.scala:850) > 2019/11/19 05:31:01 at kafka.log.Log.appendAsLeader(Log.scala:819) > 2019/11/19 05:31:01 at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > 2019/11/19 05:31:01 at > kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) > 2019/11/19 05:31:01 at > kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > 2019/11/19 05:31:01 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.map(TraversableLike.scala:238) > 2019/11/19 05:31:01 at > scala.collection.TraversableLike.map$(TraversableLike.scala:231) > 2019/11/19 05:31:01 at > scala.collection.AbstractTraversable.map(Traversable.scala:108) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751) > 2019/11/19 05:31:01 at > kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492) > 2019/11/19 05:31:01 at > kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544) > 2019/11/19 05:31:01 at > kafka.server.KafkaApis.handle(KafkaApis.scala:113) > 2019/11/19 05:31:01 at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > 2019/11/19 05:31:01 at java.lang.Thread.run(Thread.java:748) > {code} > The error persists until broker gets restarted (or leadership gets moved to > another broker). > > Brokers config: > {code:java} > advertised.host.name={{ hostname }} > port=9092 > # Default number of partitions if a value isn't set when the topic is created. > num.partitions=3 > auto.create.topics.enable=false > delete.topic.enable=false > # Prevent not in-sync replica to become a leader. > unclean.leader.election.enable=false > # The number of threads per data directory to be used for log recovery at > # startup and flushing at shutdown. > num.recovery.threads.per.data.dir=36 > log.flush.interval.messages=10000 > log.flush.interval.ms=2000 > # 1 week > log.retention.hours=168 > log.retention.check.interval.ms=300000 > log.cleaner.enable=false > # Use broker time for message timestamps. > log.message.timestamp.type=LogAppendTime > zookeeper.connect={{zookeeper_host }}:2181 > zookeeper.connection.timeout.ms=6000 > controller.socket.timeout.ms=30000 > controller.message.queue.size=10 > # Replication configuration > num.replica.fetchers=10 > # Socket server configuration > num.io.threads=32 > num.network.threads=16 > socket.request.max.bytes=104857600 > socket.receive.buffer.bytes=1048576 > socket.send.buffer.bytes=1048576 > queued.max.requests=32 > fetch.purgatory.purge.interval.requests=100 > producer.purgatory.purge.interval.requests=100 > inter.broker.protocol.version=1.1 > log.message.format.version=1.1 > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)