[ https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma resolved KAFKA-6683. -------------------------------- Resolution: Fixed Fix Version/s: 1.1.0 > ReplicaFetcher crashes with "Attempted to complete a transaction which was > not started" > ---------------------------------------------------------------------------------------- > > Key: KAFKA-6683 > URL: https://issues.apache.org/jira/browse/KAFKA-6683 > Project: Kafka > Issue Type: Bug > Components: replication > Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 > Reporter: Chema Sanchez > Assignee: Jason Gustafson > Priority: Critical > Fix For: 1.1.0 > > Attachments: server.properties > > > We have been experiencing this issue lately when restarting or replacing > brokers of our Kafka clusters during maintenance operations. > Having restarted or replaced a broker, after some minutes performing normally > it may suddenly throw the following exception and stop replicating some > partitions: > {code:none} > 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > java.lang.IllegalArgumentException: Attempted to complete a transaction which > was not started > at > kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720) > at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540) > at > kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540) > at scala.collection.immutable.List.foreach(List.scala:389) > at > scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35) > at > scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35) > at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44) > at kafka.log.Log.loadProducersFromLog(Log.scala:540) > at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521) > at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.log.Log.loadProducerState(Log.scala:514) > at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.truncateTo(Log.scala:1467) > at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454) > at > kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445) > at > scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788) > at kafka.log.LogManager.truncateTo(LogManager.scala:445) > at > kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281) > at scala.collection.Iterator.foreach(Iterator.scala:929) > at scala.collection.Iterator.foreach$(Iterator.scala:929) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > at scala.collection.IterableLike.foreach(IterableLike.scala:71) > at scala.collection.IterableLike.foreach$(IterableLike.scala:70) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, > fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread) > {code} > As during system updates all brokers in a cluster are restarted, it happened > some times the issue to manifest in different brokers holding replicas for > the same partition at the same time, which caused downtime due not enough ISR > replica. > It is necessary to restart the faulted broker in order to recover partition > replication, but after hitting this issue we often face that after restarting > the broker it shuts itself down with the following error among lots of > warnings due corrupted indices: > {code:none} > [2018-03-05 16:02:22,450] ERROR There was an error in one of the threads > during logs loading: org.apache.kafka.common.errors.ProducerFencedException: > Invalid producer epoch: 20 (zombie): 21 (current) (kafka.log.LogManager) > [2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.errors.ProducerFencedException: Invalid producer > epoch: 20 (zombie): 21 (current) > {code} > When this happened the only way to keep Kafka up has been to delete all the > data inside the log directory (/var/lib/kafka in our case). > The problem manifest randomly but we managed to reproduce the ReplicaFetcher > crashing (although not the failed startup) out of our production cluster by > doing this: > 1 - Setup a Kafka cluster running 3 brokers (see attached configuration): > 10, 11 and 12 > 2 - Create a topic with the following settings: Topic:mytopic2, > PartitionCount:12, ReplicationFactor:3, > Configs:segment.bytes=52428800,retention.ms=1800000 > 3 - Run some producers like this: > {code:java} > while true > do > ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048 > --producer-props > bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092 > enable.idempotence=true --throughput 50 --num-records 6000 --transactional-id > pruebatrans4 --transaction-duration-ms 100 > done > {code} > 4 - Run some consumer on mytopic2. > 5 - Wait for some time for semegments to be rotated. > 6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again. > 7 - Wait for data to be replicated and all replicas be in ISR. > 8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again. > 9 - Wait for data to be replicated and all replicas be in ISR. > 10 - Wait for the issue to manifest. If it manifests, after some minutes of > normal behaviour, broker 11 may suddenly stop replicating and some partitions > may appear underreplicated. > If replication after restarting node 12 takes long enough, node 11 may crash > its ReplicaFetcher before replicas in 12 are available causing partitions to > go offline. Whe have manage to reproduce the issue without deleting log data > in steps 6 and 8 but it seems more likely to manifest if we do it. The broker > experiencing the issue is quite random, but most of the time seems to be one > of the already restarted brokers but not necessary the latest one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)