[jira] [Comment Edited] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2016-02-12 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145519#comment-15145519
 ] 

Rekha Joshi edited comment on KAFKA-914 at 2/12/16 11:16 PM:
-

Hi,

We have been seeing consistent issue mirroring between our DataCenters., and 
same issue seems to resurface.Below are the details.Is this concern really 
resolved? 

Thanks
Rekha

{code}
Source: AWS (13 Brokers)
Destination: OTHER-DC (20 Brokers)
Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
Connectivity: AWS Direct Connect (max 6Gbps)
Data details: Source is receiving 40,000 msg/sec, each message is around
5KB

Mirroring


Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m
-XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
Launch script: kafka.tools.MirrorMaker --consumer.config
consumer.properties --producer.config producer.properties --num.producers
1 --whitelist mirrortest --num.streams 1 --queue.size 10

consumer.properties
---
zookeeper.connect=
group.id=KafkaMirror
auto.offset.reset=smallest
fetch.message.max.bytes=900
zookeeper.connection.timeout.ms=6
rebalance.max.retries=4
rebalance.backoff.ms=5000

producer.properties
--
metadata.broker.list=
partitioner.class=
producer.type=async
When we start the mirroring job everything works fine as expected,
Eventually we hit an issue where the job stops consuming no more.
At this stage:

1. No Error seen in the mirrormaker logs

2. consumer threads are not fetching any messages and we see thread dumps
as follows:

"ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread
t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <79b6d3ce> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa
i
t(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350
)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher
T
hread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m
c
V$sp(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
e
ad.scala:108)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Locked ownable synchronizers:
- locked <199dc92d> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

3. Producer stops producing, in trace mode we notice it's handling 0
events and Thread dump as follows:

"ProducerSendThread--0" - Thread t@53
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <5ae2fc40> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5
6
)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend
.
scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
c
er.scala:72)
- locked <8489cd8> (a java.lang.Object)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
$
mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
(
SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
(
SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at

[jira] [Comment Edited] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2016-02-12 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15145519#comment-15145519
 ] 

Rekha Joshi edited comment on KAFKA-914 at 2/13/16 1:20 AM:


Hi,
Facing similar issue; raised in https://issues.apache.org/jira/browse/KAFKA-3238
Thanks
Rekha



was (Author: rekhajoshm):
Hi,

We have been seeing consistent issue mirroring between our DataCenters., and 
same issue seems to resurface.Below are the details.Is this concern really 
resolved? 

Thanks
Rekha

{code}
Source: AWS (13 Brokers)
Destination: OTHER-DC (20 Brokers)
Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
Connectivity: AWS Direct Connect (max 6Gbps)
Data details: Source is receiving 40,000 msg/sec, each message is around
5KB

Mirroring


Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m
-XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
Launch script: kafka.tools.MirrorMaker --consumer.config
consumer.properties --producer.config producer.properties --num.producers
1 --whitelist mirrortest --num.streams 1 --queue.size 10

consumer.properties
---
zookeeper.connect=
group.id=KafkaMirror
auto.offset.reset=smallest
fetch.message.max.bytes=900
zookeeper.connection.timeout.ms=6
rebalance.max.retries=4
rebalance.backoff.ms=5000

producer.properties
--
metadata.broker.list=
partitioner.class=
producer.type=async
When we start the mirroring job everything works fine as expected,
Eventually we hit an issue where the job stops consuming no more.
At this stage:

1. No Error seen in the mirrormaker logs

2. consumer threads are not fetching any messages and we see thread dumps
as follows:

"ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread
t@73
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <79b6d3ce> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa
i
t(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350
)
at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher
T
hread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
n
$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m
c
V$sp(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
b
stractFetcherThread.scala:109)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
e
ad.scala:108)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Locked ownable synchronizers:
- locked <199dc92d> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

3. Producer stops producing, in trace mode we notice it's handling 0
events and Thread dump as follows:

"ProducerSendThread--0" - Thread t@53
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
- locked <5ae2fc40> (a java.lang.Object)
at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5
6
)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend
.
scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
c
er.scala:72)
- locked <8489cd8> (a java.lang.Object)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
$
mcV$sp(SyncProducer.scala:103)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
(
SyncProducer.scala:103)
at