Rekha Joshi created KAFKA-3238:
----------------------------------

             Summary: Deadlock Mirrormaker consumer not fetching any messages
                 Key: KAFKA-3238
                 URL: https://issues.apache.org/jira/browse/KAFKA-3238
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8.0
            Reporter: Rekha Joshi


Hi,

We have been seeing consistent issue mirroring between our DataCenters 
happening randomly.Below are the details.

Thanks
Rekha

{code}
Source: AWS (13 Brokers)
Destination: OTHER-DC (20 Brokers)
Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
Connectivity: AWS Direct Connect (max 6Gbps)
Data details: Source is receiving 40,000 msg/sec, each message is around
5KB

Mirroring
------------

Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m
-XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
Launch script: kafka.tools.MirrorMaker --consumer.config
consumer.properties --producer.config producer.properties --num.producers
1 --whitelist mirrortest --num.streams 1 --queue.size 100000

consumer.properties
---------------------------
zookeeper.connect=<host:port>
group.id=KafkaMirror
auto.offset.reset=smallest
fetch.message.max.bytes=9000000
zookeeper.connection.timeout.ms=60000
rebalance.max.retries=4
rebalance.backoff.ms=5000

producer.properties
--------------------------
metadata.broker.list=<host:port>
partitioner.class=<our custom round robin partitioner>
producer.type=async
When we start the mirroring job everything works fine as expected,
Eventually we hit an issue where the job stops consuming no more.
At this stage:

1. No Error seen in the mirrormaker logs

2. consumer threads are not fetching any messages and we see thread dumps
as follows:

"ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread
t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <79b6d3ce> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa
i
t(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350
)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher
T
hread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m
c
V$sp(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
e
ad.scala:108)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Locked ownable synchronizers:
- locked <199dc92d> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

3. Producer stops producing, in trace mode we notice it's handling 0
events and Thread dump as follows:

"ProducerSendThread--0" - Thread t@53
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <5ae2fc40> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5
6
)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend
.
scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
c
er.scala:72)
- locked <8489cd8> (a java.lang.Object)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
$
mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
(
SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
(
SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.sca
l
a:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
t
Handler$$send(DefaultEventHandler.scala:255)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$
2
.apply(DefaultEventHandler.scala:106)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$
2
.apply(DefaultEventHandler.scala:100)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Trav
e
rsableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
8
)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
8
)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala
:
771)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
e
ntHandler.scala:100)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
:
72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
a
la:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
o
ducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
o
ducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
s
cala:67)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to