[ 
https://issues.apache.org/jira/browse/KAFKA-6683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-6683.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 1.1.0

> ReplicaFetcher crashes with "Attempted to complete a transaction which was 
> not started" 
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6683
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6683
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 1.0.0
>         Environment: os: GNU/Linux 
> arch: x86_64
> Kernel: 4.9.77
> jvm: OpenJDK 1.8.0
>            Reporter: Chema Sanchez
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 1.1.0
>
>         Attachments: server.properties
>
>
> We have been experiencing this issue lately when restarting or replacing 
> brokers of our Kafka clusters during maintenance operations.
> Having restarted or replaced a broker, after some minutes performing normally 
> it may suddenly throw the following exception and stop replicating some 
> partitions:
> {code:none}
> 2018-03-15 17:23:01,482] ERROR [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalArgumentException: Attempted to complete a transaction which 
> was not started
>         at 
> kafka.log.ProducerStateManager.completeTxn(ProducerStateManager.scala:720)
>         at kafka.log.Log.$anonfun$loadProducersFromLog$4(Log.scala:540)
>         at 
> kafka.log.Log.$anonfun$loadProducersFromLog$4$adapted(Log.scala:540)
>         at scala.collection.immutable.List.foreach(List.scala:389)
>         at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:35)
>         at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:35)
>         at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:44)
>         at kafka.log.Log.loadProducersFromLog(Log.scala:540)
>         at kafka.log.Log.$anonfun$loadProducerState$5(Log.scala:521)
>         at kafka.log.Log.$anonfun$loadProducerState$5$adapted(Log.scala:514)
>         at scala.collection.Iterator.foreach(Iterator.scala:929)
>         at scala.collection.Iterator.foreach$(Iterator.scala:929)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log.loadProducerState(Log.scala:514)
>         at kafka.log.Log.$anonfun$truncateTo$2(Log.scala:1487)
>         at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.truncateTo(Log.scala:1467)
>         at kafka.log.LogManager.$anonfun$truncateTo$2(LogManager.scala:454)
>         at 
> kafka.log.LogManager.$anonfun$truncateTo$2$adapted(LogManager.scala:445)
>         at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
>         at kafka.log.LogManager.truncateTo(LogManager.scala:445)
>         at 
> kafka.server.ReplicaFetcherThread.$anonfun$maybeTruncate$1(ReplicaFetcherThread.scala:281)
>         at scala.collection.Iterator.foreach(Iterator.scala:929)
>         at scala.collection.Iterator.foreach$(Iterator.scala:929)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> kafka.server.ReplicaFetcherThread.maybeTruncate(ReplicaFetcherThread.scala:265)
>         at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeTruncate$2(AbstractFetcherThread.scala:135)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>         at 
> kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:132)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-15 17:23:01,497] INFO [ReplicaFetcher replicaId=12, leaderId=10, 
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {code}
> As during system updates all brokers in a cluster are restarted, it happened 
> some times the issue to manifest in different brokers holding replicas for 
> the same partition at the same time, which caused downtime due not enough ISR 
> replica.
> It is necessary to restart the faulted broker in order to recover partition 
> replication, but after hitting this issue we often face that after restarting 
> the broker it shuts itself down with the following error among lots of 
> warnings due corrupted indices:
> {code:none}
> [2018-03-05 16:02:22,450] ERROR There was an error in one of the threads 
> during logs loading: org.apache.kafka.common.errors.ProducerFencedException: 
> Invalid producer epoch: 20 (zombie): 21 (current) (kafka.log.LogManager)
> [2018-03-05 16:02:22,453] FATAL [KafkaServer id=10] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Invalid producer 
> epoch: 20 (zombie): 21 (current)
> {code}
> When this happened the only way to keep Kafka up has been to delete all the 
> data inside the log directory (/var/lib/kafka in our case).
> The problem manifest randomly but we managed to reproduce the ReplicaFetcher 
> crashing (although not the failed startup) out of our production cluster by 
> doing this:
>   1 - Setup a Kafka cluster running 3 brokers (see attached configuration): 
> 10, 11 and 12
>   2 - Create a topic with the following settings: Topic:mytopic2, 
> PartitionCount:12, ReplicationFactor:3, 
> Configs:segment.bytes=52428800,retention.ms=1800000
>   3 - Run some producers like this:
> {code:java}
> while true
> do
>  ./kafka-producer-perf-test.sh --topic mytopic2 --record-size=2048 
> --producer-props 
> bootstrap.servers=ec2-XXX-XXX-XXX-XXX.eu-west-1.compute.amazonaws.com:9092 
> enable.idempotence=true --throughput 50 --num-records 6000 --transactional-id 
> pruebatrans4 --transaction-duration-ms 100
> done
> {code}
>  4 - Run some consumer on mytopic2.
>  5 - Wait for some time for semegments to be rotated.
>  6 - Stop broker 11, remove everything inside /var/lib/kafka, start it again.
>  7 - Wait for data to be replicated and all replicas be in ISR.
>  8 - Stop broker 12, remove everything inside /var/lib/kafka, start it again.
>  9 - Wait for data to be replicated and all replicas be in ISR.
>  10 - Wait for the issue to manifest. If it manifests, after some minutes of 
> normal behaviour, broker 11 may suddenly stop replicating and some partitions 
> may appear underreplicated.
> If replication after restarting node 12 takes long enough, node 11 may crash 
> its ReplicaFetcher before replicas in 12 are available causing partitions to 
> go offline. Whe have manage to reproduce the issue without deleting log data 
> in steps 6 and 8 but it seems more likely to manifest if we do it. The broker 
> experiencing the issue is quite random, but most of the time seems to be one 
> of the already restarted brokers but not necessary the latest one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to