[
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alper Kanat updated KAFKA-7519:
-------------------------------
Attachment: image-2020-01-10-12-37-28-804.png
> Transactional Ids Left in Pending State by TransactionStateManager During
> Transactional Id Expiration Are Unusable
> ------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
> Issue Type: Bug
> Components: core, producer
> Affects Versions: 2.0.0
> Reporter: Bridger Howell
> Assignee: Bridger Howell
> Priority: Blocker
> Fix For: 2.0.1, 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png,
> image-2020-01-10-12-37-28-804.png
>
>
>
> After digging into a case where an exactly-once streams process was bizarrely
> unable to process incoming data, we observed the following:
> * StreamThreads stalling while creating a producer, eventually resulting in
> no consumption by that streams process. Looking into those threads, we found
> they were stuck in a loop, sending InitProducerIdRequests and always
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again.
> These requests always had the same transactional id.
> * After changing the streams process to not use exactly-once, it was able to
> process messages with no problems.
> * Alternatively, changing the applicationId for that streams process, it was
> able to process with no problems.
> * Every hour, every broker would fail the task `transactionalId-expiration`
> with the following error:
> **
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing
> transaction state transition to Dead while it already a pending sta
> te Dead
> at
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
> at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
> at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at
>
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
> at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.Li
> st.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.Li
> st.map(List.scala:296)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
> at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
> at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> a
> t
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
> at
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
> at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
> TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
> at kafka.utils.CoreUtils$.inLock(CoreUtils
> .scala:251)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
> at kafka.coordinator.transaction.TransactionStateManager$$anon
> fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
> at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
> V$sp(KafkaScheduler.scala:114)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
> at java.util.concurrent.Executors$RunnableAd
> apter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at java.util.concurrent.Scheduled
> ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExec
> utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
> tor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.jav
> a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing
> transaction state transition to Dead while it a
> lready a pending state
> Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught
> exception in scheduled task
> transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span
> class="code-quote">":"kafka.utils.Logging$class"}{code}
> Based on these problems and having read a bit of the server source, I guessed
> that this would all be explained by there being TransactionMetadata instances
> that are stuck in a pendingState.
> After doing a heap dump of the broker that was returning the error for our
> particular group, we found this:
> !image-2018-10-18-13-02-22-371.png|width=723,height=224!
> There were indeed a bunch of live TransactionMetadata instances that had a
> pending state of "Dead" but should have already been cleaned up, confirming
> my guess.
> Finally, after reading carefully through the TransactionStateManager callback
> for producing tombstones for expired transactional ids I noticed that if
> there is any error returned by the ReplicaManager, those transactions will
> _not_ have their pending state cleared.
> —
> h3. Short summary:
> If the ReplicaManager fails to append the tombstone records for expiring a
> transactional id (in my case, this likely happened during a rebalance that
> wasn't properly rate limited), the broker fails to clear it's pending state
> for that transactional id, blocking any future actions on that transactional
> id (including cleanup), until the broker is restarted or another broker
> without that problem becomes the coordinator for that transactional id.
> —
> Related:
> There was a very similar case in KAFKA-5351 where not clearing a
> TransactionMetadata's pendingState caused similar issues.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)