[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-20 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260268#comment-16260268
 ] 

kaushik srinivas commented on KAFKA-6165:
-

yes.
Can the OutOfMemoryError stack trace thrown be made to add more clarity of the 
root cause.
i.e map count being exceeded in this case ?

-Kaushik

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: config.json, kafkaServer-gc-agent06.7z, 
> kafkaServer-gc.log, kafkaServer-gc_agent03.log, kafkaServer-gc_agent04.log, 
> kafka_config.txt, map_counts_agent06, stderr_broker1.txt, stderr_broker2.txt, 
> stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-09 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245349#comment-16245349
 ] 

huxihx commented on KAFKA-6165:
---

Are you saying that the problem got away when you simple bumped up the log 
segment size while keeping everything else unchanged? Hmmm... it may alleviate 
the problem since there are fewer index files needed to be created. 

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: config.json, kafkaServer-gc-agent06.7z, 
> kafkaServer-gc.log, kafkaServer-gc_agent03.log, kafkaServer-gc_agent04.log, 
> kafka_config.txt, map_counts_agent06, stderr_broker1.txt, stderr_broker2.txt, 
> stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>  

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-08 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245321#comment-16245321
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Hi Huxihx,

Thanks for the feedback.

with,
"log_segment_bytes": 3
"log_retention_check_interval_ms": 180
"log_retention_bytes": "75"
"log_retention_hours": 48
We have now observed for 20 hours and brokers have not crashed so far.
max map count doesnt seem to go beyond ~15000 now.
We will monitor for some more duration and share if there are failures.

In general we have below questions,observations  and need your recommendations 
to tune our setups.

1. log.segment.bytes and log_retention_bytes is across all the topics and 
partitions,
In our case, few topics have very high througput and few very low throughput.
what would be the recommended way to set these two parameters considering wide 
range of data rates across topics.
2. log_retention_check_interval_ms is made to 30 mins now.
Would there be any extra memory overhead on the brokers if this value is very 
low (highly frequent).
since topics have different data injestion rates, log files generation rate is 
not same for partitons,
What is the best approach to decide on this setting ?
3. Observed map counts of the kafka process,
we see that max value on one of the broker is ~15000 and drops down to ~8000 in 
a span of 1.5 hours.
Is this ok from the GC point of view or any more optimisation can be done with 
respect to this ?
Attaching gc log file for one of the broker.
4. Since it appears to be log.segment.bytes config change, is it ok to reduce 
Java Heap Space back to 8GB (thats where we started from)?
Whats the recommendation for java heap space config in cases like ours.
5. Also from a user point of view, is it possible to have some clear error 
stack traces which helps to understand outOfMemoryError due to map limit 
exceeded or something ?

Attaching files,

Kafka latest config : [^config.json]
Map Counts monitored on one of the broker for 10 - 15 hrs: [^map_counts_agent06]
GC log file on one of the broker : [^kafkaServer-gc-agent06.7z]

Thanks in advance,
Kaushik



> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: config.json, kafkaServer-gc-agent06.7z, 
> kafkaServer-gc.log, kafkaServer-gc_agent03.log, kafkaServer-gc_agent04.log, 
> kafka_config.txt, map_counts_agent06, stderr_broker1.txt, stderr_broker2.txt, 
> stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-07 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16241745#comment-16241745
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Hi huxihx,

Thanks for the feedback.

Initially when the heap size was 8gb also, observed these issues. Do you think 
8gb is also a high value for our profiles ?
Any recommendation for "vm.max_map_count" to increase ?

We did not observe this issue when the throughput to kafka was lesser.
ie (Messages/Sec across all topics & partitions : 250k.
Bytes In/Sec across all topics & partitions : Approx 50 MB/sec.)

started observing with profiles like
(Messages/Sec across all topics & partitions : 600k.
Bytes In/Sec across all topics & partitions : Approx 120 MB/sec.)

Also find the gc log of all the three brokers attached.
broker 1: [^kafkaServer-gc.log]
broker 2: [^kafkaServer-gc_agent03.log]
broker 3: [^kafkaServer-gc_agent04.log]

Thanks & Regards,
-kaushik



> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: kafkaServer-gc.log, kafkaServer-gc_agent03.log, 
> kafkaServer-gc_agent04.log, kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-06 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16240437#comment-16240437
 ] 

huxihx commented on KAFKA-6165:
---

[~kaushik_srinivas] Since your heap size is quite large, the full GC frequency 
is relatively low, then the direct memory got less chances to be reclaimed. I 
believe that might be the cause of this problem. Although FlieChannelImpl may 
try to collect off-heap memory by invoking `System.gc()`, but Kafka configures 
`-XX:+DisableExplicitGC` to disable it before 1.0.0.

There are two possible ways that can be helpful:
1. Upgrade to 1.0.0 due to the fact that 
[KAFKA-5470|https://issues.apache.org/jira/browse/KAFKA-5470] replaced 
`-XX:+DisableExplicitGC` with `-XX:+ExplicitGCInvokesConcurrent`

2. Increase OS-level variable `max_map_count` by setting vm.max_map_count to a 
higher number. It's very likely that the default 65536 limit is hit, so try to 
bump up that number.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-05 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239981#comment-16239981
 ] 

huxihx commented on KAFKA-6165:
---

[~ijuma] Do you think it's caused by the fact that `AbstractIndex.resize` does 
not free the off-heap memory before creating new mapped memory regions, as show 
below:
{code:title=AbstractIndex.java|borderStyle=solid}
def resize(newSize: Int) {
inLock(lock) {
  val raf = new RandomAccessFile(file, "rw")
  val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
  val position = mmap.position

  /* Windows won't let us modify the file length while the file is mmapped 
:-( */
  if (OperatingSystem.IS_WINDOWS)
forceUnmap(mmap);
  try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
_maxEntries = mmap.limit / entrySize
mmap.position(position)
  } finally {
CoreUtils.swallow(raf.close())
  }
}
  }
{code}

Seems we only free the memory for Windows platform, do you think we should do 
the same as [KAFKA-4614|https://issues.apache.org/jira/browse/KAFKA-4614]? 

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-04 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16239413#comment-16239413
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Observed the no of file descriptors open over a period of time on one of the 
broker,
cat /proc/sys/fs/file-nr
47488   0   39340038

Did not observe it exceeding the limit.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
> 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237563#comment-16237563
 ] 

Ismael Juma commented on KAFKA-6165:


The memory map is failing. Maybe you're running into a file descriptors limit?

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   at kafka.log.Log.append(Log.scala:405)
>   ... 22 more
> Caused by: 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237555#comment-16237555
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Tried with 12GB of Heap space.
Observed kafka brokers crashing again with,

[2017-11-03 08:02:12,424] FATAL [ReplicaFetcherThread-0-0], Disk error while 
replicating data for store_sales-15 (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaStorageException: I/O exception in append to log 
'store_sales-15'
at kafka.log.Log.append(Log.scala:349)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
at scala.Option.foreach(Option.scala:257)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
at kafka.log.TimeIndex.(TimeIndex.scala:55)
at kafka.log.LogSegment.(LogSegment.scala:68)
at kafka.log.Log.roll(Log.scala:776)
at kafka.log.Log.maybeRoll(Log.scala:742)
at kafka.log.Log.append(Log.scala:405)
... 16 more
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)

Observing 300k messages/sec on each broker (3 brokers) at the time of broker 
crash.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237358#comment-16237358
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Sure will try to reduce the heap size to 12gb.
Initially the config was 8gb heap.
But then observed outOfMemory issues more frequently.
Actually it was consuming around 10gb heap, that was the reason heap was 
increased to 16gb.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237205#comment-16237205
 ] 

huxihx commented on KAFKA-6165:
---

Could you try to decrease the heap size a little bit? 16GB seems to be quite 
large. 

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   at kafka.log.Log.append(Log.scala:405)
>   ... 22 more
> Caused by: 

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237189#comment-16237189
 ] 

kaushik srinivas commented on KAFKA-6165:
-

java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237168#comment-16237168
 ] 

huxihx commented on KAFKA-6165:
---

What's the bitness of the JVM?

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   at kafka.log.Log.append(Log.scala:405)
>   ... 22 more
> Caused by: java.lang.OutOfMemoryError: Map failed
>   at