[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW
[ https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640391#comment-16640391 ] ASF GitHub Bot commented on KAFKA-7415: --- hachikuji closed pull request #5749: KAFKA-7415; Persist leader epoch and start offset on becoming a leader URL: https://github.com/apache/kafka/pull/5749 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e3a8186094f..ac0de9dba1d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -280,8 +280,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => +epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 4b65e439e2c..462f1f3cc23 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,6 +18,7 @@ package kafka.cluster import kafka.log.Log +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import kafka.common.KafkaException @@ -55,7 +56,7 @@ class Replica(val brokerId: Int, def lastCaughtUpTimeMs = _lastCaughtUpTimeMs - val epochs = log.map(_.leaderEpochCache) + val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache) info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue") log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9b423ba5933..eeb569a0771 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -39,7 +39,7 @@ import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.{Time, Utils} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} -import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.LeaderEpochFileCache import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import java.util.Map.{Entry => JEntry} @@ -208,7 +208,7 @@ class Log(@volatile var dir: File, /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] - @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache() + @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache() locally { val startMs = time.milliseconds @@ -218,12 +218,12 @@ class Log(@volatile var dir: File, /* Calculate the offset of the next message */ nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) -_leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) +_leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset) logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. -_leaderEpochCache.clearAndFlushEarliest(logStartOffset) +_leaderEpochCache.truncateFromStart(logStartOffset) loadProducerState
[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW
[ https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640178#comment-16640178 ] ASF GitHub Bot commented on KAFKA-7415: --- hachikuji opened a new pull request #5749: KAFKA-7415; Persist leader epoch and start offset on becoming a leader URL: https://github.com/apache/kafka/pull/5749 Note this is a backport of #5678 for 1.1 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing > truncation to HW > --- > > Key: KAFKA-7415 > URL: https://issues.apache.org/jira/browse/KAFKA-7415 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 2.0.0 >Reporter: Anna Povzner >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > If the follower's last appended epoch is ahead of the leader's last appended > epoch, the OffsetsForLeaderEpoch response will incorrectly send > (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to > HW. This may lead to data loss in some rare cases where 2 back-to-back leader > elections happen (failure of one leader, followed by quick re-election of the > next leader due to preferred leader election, so that all replicas are still > in the ISR, and then failure of the 3rd leader). > The bug is in LeaderEpochFileCache.endOffsetFor(), which returns > (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is > ahead of the last leader epoch in the cache. The method should return (last > leader epoch in the cache, LEO) in this scenario. > We don't create an entry in a leader epoch cache until a message is appended > with the new leader epoch. Every append to log calls > LeaderEpochFileCache.assign(). However, it would be much cleaner if > `makeLeader` created an entry in the cache as soon as replica becomes a > leader, which will fix the bug. In case the leader never appends any > messages, and the next leader epoch starts with the same offset, we already > have clearAndFlushLatest() that clears entries with start offsets greater or > equal to the passed offset. LeaderEpochFileCache.assign() could be merged > with clearAndFlushLatest(), so that we clear cache entries with offsets equal > or greater than the start offset of the new epoch, so that we do not need to > call these methods separately. > > Here is an example of a scenario where the issue leads to the data loss. > Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists > of (r1, r2, r3) and the leader is r1. The data up to offset 10 has been > committed to the ISR. Here is the initial state: > {code:java} > Leader: r1 > leader epoch: 0 > ISR(r1, r2, r3) > r1: [hw=10, leo=10] > r2: [hw=8, leo=10] > r3: [hw=5, leo=10] > {code} > Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with > leader epoch = 1. The leader appends a batch, but it is not replicated yet to > the followers. > {code:java} > Leader: r2 > leader epoch: 1 > ISR(r2, r3) > r1: [hw=10, leo=10] > r2: [hw=8, leo=11] > r3: [hw=5, leo=10] > {code} > Replica 3 is elected a leader (due to preferred leader election) before it > has a chance to truncate, with leader epoch 2. > {code:java} > Leader: r3 > leader epoch: 2 > ISR(r2, r3) > r1: [hw=10, leo=10] > r2: [hw=8, leo=11] > r3: [hw=5, leo=10] > {code} > Replica 2 sends OffsetsForLeaderEpoch(leader epoch = 1) to Replica 3. Replica > 3 incorrectly replies with UNDEFINED_EPOCH_OFFSET, and Replica 2 truncates to > HW. If Replica 3 fails before Replica 2 re-fetches the data, this may lead to > data loss. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW
[ https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638878#comment-16638878 ] ASF GitHub Bot commented on KAFKA-7415: --- hachikuji closed pull request #5678: KAFKA-7415; Persist leader epoch and start offset on becoming a leader URL: https://github.com/apache/kafka/pull/5678 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2036bb09da8..307fb81447b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -301,8 +301,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => +epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index d729dadcb48..22860c71475 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.log.Log -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -55,7 +55,7 @@ class Replica(val brokerId: Int, def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs - val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache) + val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache) info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue") log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index afe151d69b6..4e335ccc33b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrd import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} -import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors._ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest} import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.{KafkaException, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -229,7 +229,7 @@ class Log(@volatile var dir: File, /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] - @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCach
[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW
[ https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16624312#comment-16624312 ] ASF GitHub Bot commented on KAFKA-7415: --- hachikuji opened a new pull request #5678: KAFKA-7415; Persist leader epoch and start offset on becoming a leader URL: https://github.com/apache/kafka/pull/5678 This patch ensures that the leader epoch cache is updated when a broker becomes leader with the latest epoch and the log end offset as its starting offset. This guarantees that the leader will be able to provide the right truncation point even if the follower has data from leader epochs which the leader itself does not have. This situation can occur when there are back to back leader elections. Additionally, we have made the following changes: 1. The leader epoch cache enforces monotonically increase epochs and starting offsets among its entry. Whenever a new entry is appended which violates requirement, we remove the conflicting entries from the cache. 2. Previously we returned an unknown epoch and offset if an epoch is queried which comes before the first entry in the cache. Now we return the smallest . For example, if the earliest entry in the cache is (epoch=5, startOffset=10), then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that followers (and consumers in KIP-320) can always determine where the correct starting point is for the active log range on the leader. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing > truncation to HW > --- > > Key: KAFKA-7415 > URL: https://issues.apache.org/jira/browse/KAFKA-7415 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 2.0.0 >Reporter: Anna Povzner >Assignee: Jason Gustafson >Priority: Major > > If the follower's last appended epoch is ahead of the leader's last appended > epoch, the OffsetsForLeaderEpoch response will incorrectly send > (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to > HW. This may lead to data loss in some rare cases where 2 back-to-back leader > elections happen (failure of one leader, followed by quick re-election of the > next leader due to preferred leader election, so that all replicas are still > in the ISR, and then failure of the 3rd leader). > The bug is in LeaderEpochFileCache.endOffsetFor(), which returns > (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is > ahead of the last leader epoch in the cache. The method should return (last > leader epoch in the cache, LEO) in this scenario. > We don't create an entry in a leader epoch cache until a message is appended > with the new leader epoch. Every append to log calls > LeaderEpochFileCache.assign(). However, it would be much cleaner if > `makeLeader` created an entry in the cache as soon as replica becomes a > leader, which will fix the bug. In case the leader never appends any > messages, and the next leader epoch starts with the same offset, we already > have clearAndFlushLatest() that clears entries with start offsets greater or > equal to the passed offset. LeaderEpochFileCache.assign() could be merged > with clearAndFlushLatest(), so that we clear cache entries with offsets equal > or greater than the start offset of the new epoch, so that we do not need to > call these methods separately. > > Here is an example of a scenario where the issue leads to the data loss. > Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists > of (r1, r2, r3) and the leader is r1. The data up to offset 10 has been > committed to the ISR. Here is the initial state: > {code:java} > Leader: r1 > leader epoch: 0 > ISR(r1, r2, r3) > r1: [hw=10, leo=10] > r2: [hw=8, leo=10] > r3: [hw=5, leo=10] > {code} > Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with > leader epoch = 1. The leader appends a batch, but it is not replicated yet to > the followers. > {code:java} > Leader: r2 > leader epoch: 1 > ISR(r2, r3) > r1: [hw=10, leo=10] > r2: [hw=8, leo=11] > r3: [hw=5, leo=10] > {code} > Replica 3 is elected a leader (due to preferred leader ele