[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245321#comment-16245321
 ] 

kaushik srinivas commented on KAFKA-6165:
-----------------------------------------

Hi Huxihx,

Thanks for the feedback.

with,
"log_segment_bytes": 300000000
"log_retention_check_interval_ms": 1800000
"log_retention_bytes": "7500000000"
"log_retention_hours": 48
We have now observed for 20 hours and brokers have not crashed so far.
max map count doesnt seem to go beyond ~15000 now.
We will monitor for some more duration and share if there are failures.

In general we have below questions,observations  and need your recommendations 
to tune our setups.

1. log.segment.bytes and log_retention_bytes is across all the topics and 
partitions,
In our case, few topics have very high througput and few very low throughput.
what would be the recommended way to set these two parameters considering wide 
range of data rates across topics.
2. log_retention_check_interval_ms is made to 30 mins now.
Would there be any extra memory overhead on the brokers if this value is very 
low (highly frequent).
since topics have different data injestion rates, log files generation rate is 
not same for partitons,
What is the best approach to decide on this setting ?
3. Observed map counts of the kafka process,
we see that max value on one of the broker is ~15000 and drops down to ~8000 in 
a span of 1.5 hours.
Is this ok from the GC point of view or any more optimisation can be done with 
respect to this ?
Attaching gc log file for one of the broker.
4. Since it appears to be log.segment.bytes config change, is it ok to reduce 
Java Heap Space back to 8GB (thats where we started from)?
Whats the recommendation for java heap space config in cases like ours.
5. Also from a user point of view, is it possible to have some clear error 
stack traces which helps to understand outOfMemoryError due to map limit 
exceeded or something ?

Attaching files,

Kafka latest config : [^config.json]
Map Counts monitored on one of the broker for 10 - 15 hrs: [^map_counts_agent06]
GC log file on one of the broker : [^kafkaServer-gc-agent06.7z]

Thanks in advance,
Kaushik



> Kafka Brokers goes down with outOfMemoryError.
> ----------------------------------------------
>
>                 Key: KAFKA-6165
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6165
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>         Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>            Reporter: kaushik srinivas
>         Attachments: config.json, kafkaServer-gc-agent06.7z, 
> kafkaServer-gc.log, kafkaServer-gc_agent03.log, kafkaServer-gc_agent04.log, 
> kafka_config.txt, map_counts_agent06, stderr_broker1.txt, stderr_broker2.txt, 
> stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "1000012"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "50000000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "5000000000"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>       at kafka.log.Log.append(Log.scala:349)
>       at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>       at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>       at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>       at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>       at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>       at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>       at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>       at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>       at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>       at kafka.log.Log.roll(Log.scala:771)
>       at kafka.log.Log.maybeRoll(Log.scala:742)
>       at kafka.log.Log.append(Log.scala:405)
>       ... 22 more
> Caused by: java.lang.OutOfMemoryError: Map failed
>       at sun.nio.ch.FileChannelImpl.map0(Native Method)
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
>       ... 34 more
>       
> Issue 2 :
> stack trace :
> [2017-11-02 23:55:49,602] FATAL [ReplicaFetcherThread-0-0], Disk error while 
> replicating data for catalog_sales-3 (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'catalog_sales-3'
>       at kafka.log.Log.append(Log.scala:349)
>       at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
>       at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Caused by: java.io.IOException: Map failed
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>       at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>       at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>       at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>       at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>       at kafka.log.Log.roll(Log.scala:771)
>       at kafka.log.Log.maybeRoll(Log.scala:742)
>       at kafka.log.Log.append(Log.scala:405)
>       ... 16 more
> Caused by: java.lang.OutOfMemoryError: Map failed
>       at sun.nio.ch.FileChannelImpl.map0(Native Method)
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
>       ... 28 more
>       
> These two exceptions are happening across all the 3 brokers continously with 
> the same kafka configuration.
> Broker dies with these exceptions.
> Attached the log files for 2 issues of  two brokers.
> Also attached is the kafka configuration json data being used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to