Bridger Howell created KAFKA-7519:
-------------------------------------
Summary: Transactional Ids Left in Pending State by
TransactionStateManager During Transactional Id Expiration Are Unusable
Key: KAFKA-7519
URL: https://issues.apache.org/jira/browse/KAFKA-7519
Project: Kafka
Issue Type: Bug
Components: core, producer
Reporter: Bridger Howell
Attachments: image-2018-10-18-13-02-22-371.png
After digging into a case where an exactly-once streams process was bizarrely
unable to process incoming data, we observed the following:
* StreamThreads stalling while creating a producer, eventually resulting in no
consumption by that streams process. Looking into those threads, we found they
were stuck in a loop, sending InitProducerIdRequests and always receiving back
the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests
always had the same transactional id.
* After changing the streams process to not use exactly-once, it was able to
process messages with no problems.
* Alternatively, changing the applicationId for that streams process, it was
able to process with no problems.
* Every hour, every broker would fail the task `transactionalId-expiration`
with the following error:
**
{code:java}
{"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing
transaction state transition to Dead while it already a pending sta
te Dead
at
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
at kafka.coordinator
.transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
at kafka.coordinator.transaction.TransactionStateManager$$a
nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
a:151)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
at kafka.coordinator.transaction.TransactionSt
ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
ala:150)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
Like.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Li
st.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.Li
st.map(List.scala:296)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
at scala.collection.Traversabl
eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
scala:241)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anon
fun$foreach$1.apply(HashMap.scala:130)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collec
tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at scala.collecti
on.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
a
t
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
ansactionStateManager.scala:142)
at
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
at kafka.utils.CoreUtils$.inLock(CoreUtils
.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.coordinator.transaction.TransactionStateManager$$anon
fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
V$sp(KafkaScheduler.scala:114)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
at java.util.concurrent.Executors$RunnableAd
apter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.Scheduled
ThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExec
utor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.jav
a:748)","exception_class":"java.lang.IllegalStateException","exception_message":"Preparing
transaction state transition to Dead while it a
lready a pending state
Dead"},"source_host":"kafka-broker-4.kafka-broker.default.svc.cluster.local","method":"error","level":"ERROR","message":"Uncaught
exception in scheduled task
transactionalId-expiration","mdc":{},"file":"Logging.scala","line_number":"76","thread_name":"transaction-log-manager-0","logger_name":"kafka.utils.KafkaScheduler","class<span
class="code-quote">":"kafka.utils.Logging$class"}{code}
Based on these problems and having read a bit of the server source, I guessed
that this would all be explained by there being TransactionMetadata instances
that are stuck in a pendingState.
After doing a heap dump of the broker that was returning the error for our
particular group, we found this:
!image-2018-10-18-13-02-22-371.png!
There were indeed a bunch of live TransactionMetadata instances that had a
pending state of "Dead" but should have already been cleaned up, confirming my
guess.
Finally, after reading carefully through the TransactionStateManager callback
for producing tombstones for expired transactional ids I noticed that if there
is any error returned by the ReplicaManager, those transactions will _not_ have
their pending state cleared.
---
h3. Short summary:
If the ReplicaManager fails to append the tombstone records for expiring a
transactional id (in my case, this likely happened during a rebalance that
wasn't properly rate limited), the broker fails to clear it's pending state for
that transactional id, blocking any future actions on that transactional id
(including cleanup), until the broker is restarted or another broker without
that problem becomes the coordinator for that transactional id.
---
Related:
There was a very similar issue in KAFKA-5351 where not clearing a
TransactionMetadata's pendingState caused similar issues.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)