[
https://issues.apache.org/jira/browse/KAFKA-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stanislav Kozlovski resolved KAFKA-8036.
----------------------------------------
Resolution: Fixed
I was wrong. After writing a test to verify the behavior
([https://github.com/stanislavkozlovski/kafka/commit/b93a866e2318c47ff7538ddecd2fa38f29abd188),]
I found that KAFKA-7897 fixed this issue across all versions it affected.
> Log dir reassignment on followers fails with FileNotFoundException for the
> leader epoch cache on leader election
> ----------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8036
> URL: https://issues.apache.org/jira/browse/KAFKA-8036
> Project: Kafka
> Issue Type: Improvement
> Affects Versions: 1.1.1, 2.0.1
> Reporter: Stanislav Kozlovski
> Assignee: Stanislav Kozlovski
> Priority: Major
>
> When changing a partition's log directories for a follower broker, we move
> all the data related to that partition to the other log dir (as per
> [KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]).
> On a successful move, we rename the original directory by adding a suffix
> consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to
> `test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`)
> We copy every log file and [initialize a new leader epoch file
> cache|https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768].
> The problem is that we do not update the associated `Replica` class' leader
> epoch cache - it still points to the old `LeaderEpochFileCache` instance.
> This results in a FileNotFound exception when the broker is [elected as a
> leader for the
> [partition|https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312].
> This has the unintended side effect of marking the log directory as offline,
> resulting in all partitions from that log directory becoming unavailable for
> the specific broker.
> h2.
> h2. Exception and logs
> I reproduced this locally by running two brokers. The steps to reproduce:
> {code:java}
> Create partition replicated across two brokers (A, B) with leader A
> Move partition leadership to B
> Alter log dirs on A
> Move partition leadership back to A{code}
> This results in a log directory structure on broker B similar to this:
> {code:java}
> ├── new_dir
> │ ├── cleaner-offset-checkpoint
> │ ├── log-start-offset-checkpoint
> │ ├── meta.properties
> │ ├── recovery-point-offset-checkpoint
> │ ├── replication-offset-checkpoint
> │ └── test_log_dir-0
> │ ├── 00000000000000000000.index
> │ ├── 00000000000000000000.log
> │ ├── 00000000000000000000.timeindex
> │ └── leader-epoch-checkpoint
> └── old_dir
> ├── cleaner-offset-checkpoint
> ├── log-start-offset-checkpoint
> ├── meta.properties
> ├── recovery-point-offset-checkpoint
> ├── replication-offset-checkpoint
> └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete
> ├── 00000000000000000000.index
> ├── 00000000000000000000.log
> ├── 00000000000000000000.timeindex
> ├── 00000000000000000009.snapshot
> └── leader-epoch-checkpoint
> {code}
>
>
> {code:java}
> [2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0]
> test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch
> was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN
> [LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3,
> startOffset=9) caused truncation of conflicting entries
> ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries.
> (kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR
> Error while writing to checkpoint file
> /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint
> (kafka.server.LogDirFailureChannel) java.io.FileNotFoundException:
> /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or
> directory) at java.base/java.io.FileOutputStream.open0(Native Method) at
> java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at
> java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:238) at
> java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:188) at
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
> at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at
> kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64)
> at
> kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219)
> at
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp(LeaderEpochFileCache.scala:62)
> at
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
> at
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at
> kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:52)
> at
> kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:395)
> at
> kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:394)
> at scala.Option.foreach(Option.scala:257) at
> kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:394) at
> kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:367) at
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at
> kafka.cluster.Partition.makeLeader(Partition.scala:367) at
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1162)
> at
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1160)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at
> scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at
> kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1160) at
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1072)
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at
> kafka.server.KafkaApis.handle(KafkaApis.scala:110) at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at
> java.base/java.lang.Thread.run(Thread.java:844) [2019-03-04 15:36:56,864]
> INFO [ReplicaManager broker=0] Stopping serving replicas in dir /logs/old_dir
> (kafka.server.ReplicaManager)
> {code}
> As you can see from the stack trace, `Replica#epochs`'s
> `LeaderEpochFileCache` still points to the old
> `/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint` file
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)