[ 
https://issues.apache.org/jira/browse/KAFKA-8673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891414#comment-16891414
 ] 

Guozhang Wang commented on KAFKA-8673:
--------------------------------------

Hello [~varsha.abhinandan] I looked into your stack trace and I think there are 
two issues here:

1. Threads who are parked on KafkaProducer.sendOffsetsToTransaction should not 
park forever: although TxnRequestHandler would retry infinitely when getting 
request timeout / node disconnects, once the broker is back up online they 
requests should be responded and then TransactionalRequestResult.await could 
return. From the source code I cannot find anywhere why this is not happening 
--- maybe it needs some time to clean all the re-enqueued requests, but it 
should not block forever, could you confirm that even until brokers are all up 
and running and handling requests normally, these threads are still blocked on 
the call for very long time?

2. Other threads waiting on join-group requests for these blocked threads: this 
should actually now be resolved as part of 
https://issues.apache.org/jira/browse/KAFKA-6399, which is in 2.3.0. In other 
words when some threads are blocked, they would still be kicked out of the 
group upon rebalance timeout which is now not infinity anymore.

> Kafka stream threads stuck while sending offsets to transaction preventing 
> join group from completing
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8673
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, streams
>    Affects Versions: 2.2.0
>            Reporter: Varsha Abhinandan
>            Priority: Major
>         Attachments: Screen Shot 2019-07-11 at 12.08.09 PM.png
>
>
> We observed a deadlock kind of a situation in our Kafka streams application 
> when we accidentally shut down all the brokers. The Kafka cluster was brought 
> back in about an hour. 
> Observations made :
>  # Normal Kafka producers and consumers started working fine after the 
> brokers were up again. 
>  # The Kafka streams applications were stuck in the "rebalancing" state.
>  # The Kafka streams apps have exactly-once semantics enabled.
>  # The stack trace showed most of the stream threads sending the join group 
> requests to the group co-ordinator
>  # Few stream threads couldn't initiate the join group request since the call 
> to 
> [org.apache.kafka.clients.producer.KafkaProducer#sendOffsetsToTransaction|https://jira.corp.appdynamics.com/browse/ANLYTCS_ES-2062#sendOffsetsToTransaction%20which%20was%20hung]
>  was stuck.
>  # Seems like the join group requests were getting parked at the coordinator 
> since the expected members hadn't sent their own group join requests
>  # And after the timeout, the stream threads that were not stuck sent a new 
> join group requests.  
>  # Maybe (6) and (7) is happening infinitely
>  # Sample values of the GroupMetadata object on the group co-ordinator - 
> [^Screen Shot 2019-07-11 at 12.08.09 PM.png]
>  # The list of notYetJoinedMembers client id's matched with the threads 
> waiting for their offsets to be committed. 
> {code:java}
> [List(MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer-efa41349-3da1-43b6-9710-a662f68c63b1,
>  
> clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer,
>  clientHost=/10.136.98.48, sessionTimeoutMs=15000, 
> rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), 
> MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer-7cc8e41b-ad98-4006-a18a-b22abe6350f4,
>  
> clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer,
>  clientHost=/10.136.103.148, sessionTimeoutMs=15000, 
> rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), 
> MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer-9ffb96c1-3379-4cbd-bee1-5d4719fe6c9d,
>  
> clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer,
>  clientHost=/10.136.98.48, sessionTimeoutMs=15000, 
> rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), 
> MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer-5b8a1f1f-84dd-4a87-86c8-7542c0e50d1f,
>  
> clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer,
>  clientHost=/10.136.103.148, sessionTimeoutMs=15000, 
> rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), 
> MemberMetadata(memberId=metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33-consumer-3cb67ec9-c548-4386-962d-64d9772bf719,
>  
> clientId=metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33-consumer,
>  clientHost=/10.136.99.15, sessionTimeoutMs=15000, 
> rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ))]
> vabhinandan-mac:mp-jstack varsha.abhinandan$ cat jstack.* | grep 
> "metric-extractor-stream-c1-" | grep "StreamThread-" | grep "waiting on 
> condition"
> "metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36"
>  #128 daemon prio=5 os_prio=0 tid=0x00007fc53c047800 nid=0xac waiting on 
> condition [0x00007fc4e68e7000]
> "metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21"
>  #93 daemon prio=5 os_prio=0 tid=0x00007fc53c2b5800 nid=0x9d waiting on 
> condition [0x00007fc4e77f6000]
> "metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33"
>  #125 daemon prio=5 os_prio=0 tid=0x00007fe18017c800 nid=0xbc waiting on 
> condition [0x00007fe12e7e8000]
> "metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38"
>  #154 daemon prio=5 os_prio=0 tid=0x00007f27c4225800 nid=0xc4 waiting on 
> condition [0x00007f2772bec000]
> "metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27"
>  #143 daemon prio=5 os_prio=0 tid=0x00007f27c4365800 nid=0xb9 waiting on 
> condition [0x00007f27736f7000]
> {code}
> 11. Sample Stream Thread stuck - 
> {noformat}
> "metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36"
>  #128 daemon prio=5 os_prio=0 tid=0x00007fc53c047800 nid=0xac waiting on 
> condition [0x00007fc4e68e7000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0000000723587580> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:675)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:487)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:459)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to