[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655934#comment-16655934 ]
Jason Gustafson commented on KAFKA-7519: ---------------------------------------- Good find and thanks for the investigation. > Transactional Ids Left in Pending State by TransactionStateManager During > Transactional Id Expiration Are Unusable > ------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-7519 > URL: https://issues.apache.org/jira/browse/KAFKA-7519 > Project: Kafka > Issue Type: Bug > Components: core, producer > Reporter: Bridger Howell > Assignee: Dhruvil Shah > Priority: Critical > Attachments: image-2018-10-18-13-02-22-371.png > > > > After digging into a case where an exactly-once streams process was bizarrely > unable to process incoming data, we observed the following: > * StreamThreads stalling while creating a producer, eventually resulting in > no consumption by that streams process. Looking into those threads, we found > they were stuck in a loop, sending InitProducerIdRequests and always > receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. > These requests always had the same transactional id. > * After changing the streams process to not use exactly-once, it was able to > process messages with no problems. > * Alternatively, changing the applicationId for that streams process, it was > able to process with no problems. > * Every hour, every broker would fail the task `transactionalId-expiration` > with the following error: > ** > {code:java} > {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing > transaction state transition to Dead while it already a pending sta > te Dead > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) > at kafka.coordinator > .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) > at kafka.coordinator.transaction.TransactionStateManager$$a > nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal > a:151) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano > nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at kafka.coordinator.transaction.TransactionSt > ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc > ala:150) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a > nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) > at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable > Like.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.map(List.scala:296) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app > ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) > at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl > eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) > at scala.collection.Traversabl > eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. > scala:241) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at scala.collection.mutable.HashMap$$anon > fun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collec > tion.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at scala.collecti > on.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > a > t > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr > ansactionStateManager.scala:142) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a > nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) > at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable > TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) > at kafka.utils.CoreUtils$.inLock(CoreUtils > .scala:251) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > at kafka.coordinator.transaction.TransactionStateManager$$anon > fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140) > at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc > V$sp(KafkaScheduler.scala:114) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAd > apter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at java.util.concurrent.Scheduled > ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExec > utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu > tor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.jav > a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing > transaction state transition to Dead while it a > lready a pending state > Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught > exception in scheduled task > transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span > class="code-quote">":"kafka.utils.Logging$class"}{code} > Based on these problems and having read a bit of the server source, I guessed > that this would all be explained by there being TransactionMetadata instances > that are stuck in a pendingState. > After doing a heap dump of the broker that was returning the error for our > particular group, we found this: > !image-2018-10-18-13-02-22-371.png|width=723,height=224! > There were indeed a bunch of live TransactionMetadata instances that had a > pending state of "Dead" but should have already been cleaned up, confirming > my guess. > Finally, after reading carefully through the TransactionStateManager callback > for producing tombstones for expired transactional ids I noticed that if > there is any error returned by the ReplicaManager, those transactions will > _not_ have their pending state cleared. > — > h3. Short summary: > If the ReplicaManager fails to append the tombstone records for expiring a > transactional id (in my case, this likely happened during a rebalance that > wasn't properly rate limited), the broker fails to clear it's pending state > for that transactional id, blocking any future actions on that transactional > id (including cleanup), until the broker is restarted or another broker > without that problem becomes the coordinator for that transactional id. > — > Related: > There was a very similar case in KAFKA-5351 where not clearing a > TransactionMetadata's pendingState caused similar issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)