[ 
https://issues.apache.org/jira/browse/KAFKA-9213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983866#comment-16983866
 ] 

Daniyar commented on KAFKA-9213:
--------------------------------

UPDATE:

I've investigated another occurrence of this exception.

For analyzes, I used:
1) a memory dump that was taken from the broker around 6.37 pm
2) kafka log file
3) kafka state-change log
4) log, index and time-index files of a failed segment
5) Kafka source code, version 2.3.1 and 1.1.0

Here's how the exception looks like in the kafka log:

```

2019/11/19 16:03:00 INFO [ProducerStateManager partition=ad_group_metrics-62] 
Writing producer snapshot at offset 13886052 (kafka.log.ProducerStateManager) 
2019/11/19 16:03:00 INFO [Log partition=ad_group_metrics-62, dir=/mnt/kafka] 
Rolled new log segment at offset 13886052 in 1 ms. (kafka.log.Log) 2019/11/19 
16:03:00 ERROR [ReplicaManager broker=17] Error processing append operation on 
partition ad_group_metrics-62 (kafka.server.ReplicaManager) 2019/11/19 16:03:00 
java.nio.BufferOverflowException 2019/11/19 16:03:00 at 
java.nio.Buffer.nextPutIndex(Buffer.java:527) 2019/11/19 16:03:00 at 
java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797) 2019/11/19 
16:03:00 at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134) 
2019/11/19 16:03:00 at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 
2019/11/19 16:03:00 at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) 
2019/11/19 16:03:00 at 
kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520) 2019/11/19 
16:03:00 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690) 2019/11/19 16:03:00 
at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690) 2019/11/19 16:03:00 at 
scala.Option.foreach(Option.scala:407) 2019/11/19 16:03:00 at 
kafka.log.Log.$anonfun$roll$2(Log.scala:1690) 2019/11/19 16:03:00 at 
kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at 
kafka.log.Log.roll(Log.scala:1654) 2019/11/19 16:03:00 at 
kafka.log.Log.maybeRoll(Log.scala:1639) 2019/11/19 16:03:00 at 
kafka.log.Log.$anonfun$append$2(Log.scala:966) 2019/11/19 16:03:00 at 
kafka.log.Log.maybeHandleIOException(Log.scala:2085) 2019/11/19 16:03:00 at 
kafka.log.Log.append(Log.scala:850) 2019/11/19 16:03:00 at 
kafka.log.Log.appendAsLeader(Log.scala:819) 2019/11/19 16:03:00 at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772) 
2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 
2019/11/19 16:03:00 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259) 
2019/11/19 16:03:00 at 
kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759) 2019/11/19 
16:03:00 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
 2019/11/19 16:03:00 at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 
2019/11/19 16:03:00 at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) 
2019/11/19 16:03:00 at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) 2019/11/19 
16:03:00 at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) 
2019/11/19 16:03:00 at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) 2019/11/19 
16:03:00 at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) 
2019/11/19 16:03:00 at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) 2019/11/19 
16:03:00 at scala.collection.TraversableLike.map$(TraversableLike.scala:231) 
2019/11/19 16:03:00 at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) 2019/11/19 
16:03:00 at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751) 
2019/11/19 16:03:00 at 
kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492) 2019/11/19 
16:03:00 at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544) 
2019/11/19 16:03:00 at kafka.server.KafkaApis.handle(KafkaApis.scala:113) 
2019/11/19 16:03:00 at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) 2019/11/19 
16:03:00 at java.lang.Thread.run(Thread.java:748)

...

```

What we see here, is that a new segment was rolled out at the offset 13886052 
and then an exception happened while trying to make _some_ segment as inactive 
({{`onBecomeInactiveSegment`}}) on appending new messages to the Log. The 
timing of the rolling out of a new segment and appending new messages doesn't 
play a role. There are many other similar exceptions where this occurs a few 
seconds after rolling out of a new segment.

I managed to find the {{`LogSegment`}} object for the offset 13886052 in the 
memory dump. I followed the source code logic, checking the LogSegment state 
and Kafka logs, and found that the `TimeIndex` object somehow went into the 
state with 0 entries and 0 max possible entries (and an empty memory map). 
Having 0 entries is normal for TimeIndex and OffsetIndex even if there are some 
records in the Log unless their size passes some threshold. But having 0 max 
possible entries along with 0 entries made the TimeIndex considered as full (0 
entries == 0 max entries) and was triggering the rolling out a new segment. The 
Log was trying to add a final timestamp to the TimeIndex but was failing due to 
an empty memory map (BufferOverflowException). Neither the OffsetIndex nor Log 
messages were in a similar state.

According to the source code, the only way to get into a state when TimeIndex 
has 0 entries and 0 max possible entries is to call `{{resize(0)`}} or 
`{{resize(entrySize * _entries)`}} methods. Unfortunately, I didn't find the 
ways to have them called that could lead to such a state. These calls could be 
done either for both OffsetIndex and TimeIndex at the same time (but we don't 
observe a similar state for the OffsetIndex in the memory dump) or during the 
loading of the segment on Kafka server startup (which is not possible for the 
newly rolled out segment).

The {{`state-change.log`}} was not useful since it didn't contain any 
state-changing events around that time. So, we can say that Kafka cluster and 
that particular broker considered themselves as stable and healthy.

So, for summarizing:
1) Some, a maybe regular scheduled, event coincidentally happens after rolling 
out of a new segment.
2) That event resizes TimeIndex to 0 sizes with 0 max possible entries and 
reinitializes a memory map making it 0 sizes.
3) After that, TimeIndex gets considered as full but with no possibility to add 
a final timestamp to the empty memory map.

 

> BufferOverflowException on rolling new segment after upgrading Kafka from 
> 1.1.0 to 2.3.1
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9213
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9213
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 2.3.1
>         Environment: Ubuntu 16.04, AWS instance d2.8xlarge.
> JAVA Options:
> -Xms16G 
> -Xmx16G 
> -XX:G1HeapRegionSize=16M 
> -XX:MetaspaceSize=96m 
> -XX:MinMetaspaceFreeRatio=50 
>            Reporter: Daniyar
>            Priority: Blocker
>
> We updated our Kafka cluster from 1.1.0 version to 2.3.1. We followed up to 
> step 2 of the [update 
> instruction|[https://kafka.apache.org/documentation/#upgrade]].
> Message format and inter-broker protocol versions were left the same:
> inter.broker.protocol.version=1.1
> log.message.format.version=1.1
>  
> After upgrading, we started to get some occasional exceptions:
> {code:java}
> 2019/11/19 05:30:53 INFO [ProducerStateManager
> partition=matchmaker_retry_clicks_15m-2] Writing producer snapshot at
> offset 788532 (kafka.log.ProducerStateManager)
> 2019/11/19 05:30:53 INFO [Log partition=matchmaker_retry_clicks_15m-2,
> dir=/mnt/kafka] Rolled new log segment at offset 788532 in 1 ms.
> (kafka.log.Log)
> 2019/11/19 05:31:01 ERROR [ReplicaManager broker=0] Error processing append
> operation on partition matchmaker_retry_clicks_15m-2
> (kafka.server.ReplicaManager)
> 2019/11/19 05:31:01 java.nio.BufferOverflowException
> 2019/11/19 05:31:01     at java.nio.Buffer.nextPutIndex(Buffer.java:527)
> 2019/11/19 05:31:01     at
> java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797)
> 2019/11/19 05:31:01     at
> kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
> 2019/11/19 05:31:01     at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> 2019/11/19 05:31:01     at
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
> 2019/11/19 05:31:01     at
> kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
> 2019/11/19 05:31:01     at
> kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520)
> 2019/11/19 05:31:01     at kafka.log.Log.$anonfun$roll$8(Log.scala:1690)
> 2019/11/19 05:31:01     at
> kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690)
> 2019/11/19 05:31:01     at scala.Option.foreach(Option.scala:407)
> 2019/11/19 05:31:01     at kafka.log.Log.$anonfun$roll$2(Log.scala:1690)
> 2019/11/19 05:31:01     at
> kafka.log.Log.maybeHandleIOException(Log.scala:2085)
> 2019/11/19 05:31:01     at kafka.log.Log.roll(Log.scala:1654)
> 2019/11/19 05:31:01     at kafka.log.Log.maybeRoll(Log.scala:1639)
> 2019/11/19 05:31:01     at kafka.log.Log.$anonfun$append$2(Log.scala:966)
> 2019/11/19 05:31:01     at
> kafka.log.Log.maybeHandleIOException(Log.scala:2085)
> 2019/11/19 05:31:01     at kafka.log.Log.append(Log.scala:850)
> 2019/11/19 05:31:01     at kafka.log.Log.appendAsLeader(Log.scala:819)
> 2019/11/19 05:31:01     at
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772)
> 2019/11/19 05:31:01     at
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
> 2019/11/19 05:31:01     at
> kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
> 2019/11/19 05:31:01     at
> kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759)
> 2019/11/19 05:31:01     at
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
> 2019/11/19 05:31:01     at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> 2019/11/19 05:31:01     at
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
> 2019/11/19 05:31:01     at
> scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
> 2019/11/19 05:31:01     at
> scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
> 2019/11/19 05:31:01     at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
> 2019/11/19 05:31:01     at
> scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
> 2019/11/19 05:31:01     at
> scala.collection.TraversableLike.map(TraversableLike.scala:238)
> 2019/11/19 05:31:01     at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> 2019/11/19 05:31:01     at
> scala.collection.AbstractTraversable.map(Traversable.scala:108)
> 2019/11/19 05:31:01     at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751)
> 2019/11/19 05:31:01     at
> kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492)
> 2019/11/19 05:31:01     at
> kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544)
> 2019/11/19 05:31:01     at
> kafka.server.KafkaApis.handle(KafkaApis.scala:113)
> 2019/11/19 05:31:01     at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> 2019/11/19 05:31:01     at java.lang.Thread.run(Thread.java:748)
> {code}
> The error persists until broker gets restarted (or leadership gets moved to 
> another broker).
>  
> Brokers config:
> {code:java}
> advertised.host.name={{ hostname }}
> port=9092
> # Default number of partitions if a value isn't set when the topic is created.
> num.partitions=3
> auto.create.topics.enable=false
> delete.topic.enable=false
> # Prevent not in-sync replica to become a leader.
> unclean.leader.election.enable=false
> # The number of threads per data directory to be used for log recovery at
> # startup and flushing at shutdown.
> num.recovery.threads.per.data.dir=36
> log.flush.interval.messages=10000
> log.flush.interval.ms=2000
> # 1 week
> log.retention.hours=168
> log.retention.check.interval.ms=300000
> log.cleaner.enable=false
> # Use broker time for message timestamps.
> log.message.timestamp.type=LogAppendTime
> zookeeper.connect={{zookeeper_host }}:2181
> zookeeper.connection.timeout.ms=6000
> controller.socket.timeout.ms=30000
> controller.message.queue.size=10
> # Replication configuration
> num.replica.fetchers=10
> # Socket server configuration
> num.io.threads=32
> num.network.threads=16
> socket.request.max.bytes=104857600
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> queued.max.requests=32
> fetch.purgatory.purge.interval.requests=100
> producer.purgatory.purge.interval.requests=100
> inter.broker.protocol.version=1.1
> log.message.format.version=1.1
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to