Hi,

I am using mirror maker in trunk to replica data across two data centers.
While the destination broker was having busy load and unresponsive the send
rate of mirror maker was very low and the available producer buffer was
quickly filled up. At the end mirror maker threw OOME. Detailed exception
can be found here
https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-exception-L1

I started up mirror maker with 1G memory and 256M producer buffer. I used
eclipse MAT to analyze the heap dump and found out the retained heap size
of all RecordBatch objects were more than 500MB half of which were used to
retain data that were to send to destination broker which makes sense to me
as it is close to 256MB producer buffer but the other half of which were
used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As every
producer callback in mirror maker takes the message value and hold it until
the message is successfully delivered. In my case since the destination
broker was very unresponsive the message value held by callback would stay
forever which I think is a waste and it is a major contributor to the OOME
issue. screenshot of MAT
https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screenshot-png

The other interesting problem I observed is that when I turned on
unreachable object parsing in MAT more than 400MB memory was occupied by
unreachable objects. It surprised me that gc didn't clean them up before
OOME was thrown. As suggested in gc log
https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log-L1
Full GC was unable to reclaim any memory and when facing OOME these
unreachable objects should have been cleaned up. so either eclipse MAT has
issue parsing the heap dump or there is hidden memory leak that is hard to
find. I attached the sample screenshot of the unreachable objects here
https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable-objects-png

The consumer properties

zookeeper.connect=zk
zookeeper.connection.timeout.ms=1000000
group.id=mm
auto.offset.reset=smallest
partition.assignment.strategy=roundrobin

The producer properties

bootstrap.servers=brokers
client.id=mirror-producer
producer.type=async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
buffer.memory=268435456
batch.size=1048576
max.request.size=5242880
send.buffer.bytes=1048576

The java command to start mirror maker
java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof
-XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 -server
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
-Djava.awt.headless=true
-Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg
-Dlog4j.configuration=file:/usr/share/kafka/bin/../config/tools-log4j.properties
-cp libs/* kafka.tools.MirrorMaker --consumer.config
consumer.properties --num.streams 10 --producer.config
producer.properties --whitelist test.*

Reply via email to