[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640391#comment-16640391
 ] 

ASF GitHub Bot commented on KAFKA-7415:
---

hachikuji closed pull request #5749: KAFKA-7415; Persist leader epoch and start 
offset on becoming a leader
URL: https://github.com/apache/kafka/pull/5749
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index e3a8186094f..ac0de9dba1d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -280,8 +280,17 @@ class Partition(val topic: String,
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
   leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
-  val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
+  // In the case of successive leader elections in a short time period, a 
follower may have
+  // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+  // to ensure that these followers can truncate to the right offset, we 
must cache the new
+  // leader epoch and the start offset since it should be larger than any 
epoch that a follower
+  // would try to query.
+  leaderReplica.epochs.foreach { epochCache =>
+epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+  }
+
+  val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
   val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
   val curTimeMs = time.milliseconds
   // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 4b65e439e2c..462f1f3cc23 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,6 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
@@ -55,7 +56,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
 
-  val epochs = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high 
watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 9b423ba5933..eeb569a0771 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -39,7 +39,7 @@ import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.{Time, Utils}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochFileCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import java.util.Map.{Entry => JEntry}
@@ -208,7 +208,7 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = 
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochCache = 
initializeLeaderEpochCache()
+  @volatile private var _leaderEpochCache: LeaderEpochFileCache = 
initializeLeaderEpochCache()
 
   locally {
 val startMs = time.milliseconds
@@ -218,12 +218,12 @@ class Log(@volatile var dir: File,
 /* Calculate the offset of the next message */
 nextOffsetMetadata = new LogOffsetMetadata(nextOffset, 
activeSegment.baseOffset, activeSegment.size)
 
-_leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
+_leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset)
 
 logStartOffset = math.max(logStartOffset, 
segments.firstEntry.getValue.baseOffset)
 
 // The earliest leader epoch may not be flushed during a hard failure. 
Recover it here.
-_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+_leaderEpochCache.truncateFromStart(logStartOffset)
 
 loadProducerState

[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640178#comment-16640178
 ] 

ASF GitHub Bot commented on KAFKA-7415:
---

hachikuji opened a new pull request #5749: KAFKA-7415; Persist leader epoch and 
start offset on becoming a leader
URL: https://github.com/apache/kafka/pull/5749
 
 
   Note this is a backport of #5678 for 1.1
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing 
> truncation to HW
> ---
>
> Key: KAFKA-7415
> URL: https://issues.apache.org/jira/browse/KAFKA-7415
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 2.0.0
>Reporter: Anna Povzner
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> If the follower's last appended epoch is ahead of the leader's last appended 
> epoch, the OffsetsForLeaderEpoch response will incorrectly send 
> (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to 
> HW. This may lead to data loss in some rare cases where 2 back-to-back leader 
> elections happen (failure of one leader, followed by quick re-election of the 
> next leader due to preferred leader election, so that all replicas are still 
> in the ISR, and then failure of the 3rd leader).
> The bug is in LeaderEpochFileCache.endOffsetFor(), which returns 
> (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is 
> ahead of the last leader epoch in the cache. The method should return (last 
> leader epoch in the cache, LEO) in this scenario.
> We don't create an entry in a leader epoch cache until a message is appended 
> with the new leader epoch. Every append to log calls 
> LeaderEpochFileCache.assign(). However, it would be much cleaner if 
> `makeLeader` created an entry in the cache as soon as replica becomes a 
> leader, which will fix the bug. In case the leader never appends any 
> messages, and the next leader epoch starts with the same offset, we already 
> have clearAndFlushLatest() that clears entries with start offsets greater or 
> equal to the passed offset. LeaderEpochFileCache.assign() could be merged 
> with clearAndFlushLatest(), so that we clear cache entries with offsets equal 
> or greater than the start offset of the new epoch, so that we do not need to 
> call these methods separately. 
>  
> Here is an example of a scenario where the issue leads to the data loss.
> Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists 
> of (r1, r2, r3) and the leader is r1. The data up to offset 10 has been 
> committed to the ISR. Here is the initial state:
> {code:java}
> Leader: r1
> leader epoch: 0
> ISR(r1, r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=10]
> r3: [hw=5, leo=10]
> {code}
> Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with 
> leader epoch = 1. The leader appends a batch, but it is not replicated yet to 
> the followers.
> {code:java}
> Leader: r2
> leader epoch: 1
> ISR(r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=11]
> r3: [hw=5, leo=10]
> {code}
> Replica 3 is elected a leader (due to preferred leader election) before it 
> has a chance to truncate, with leader epoch 2. 
> {code:java}
> Leader: r3
> leader epoch: 2
> ISR(r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=11]
> r3: [hw=5, leo=10]
> {code}
> Replica 2 sends OffsetsForLeaderEpoch(leader epoch = 1) to Replica 3. Replica 
> 3 incorrectly replies with UNDEFINED_EPOCH_OFFSET, and Replica 2 truncates to 
> HW. If Replica 3 fails before Replica 2 re-fetches the data, this may lead to 
> data loss.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638878#comment-16638878
 ] 

ASF GitHub Bot commented on KAFKA-7415:
---

hachikuji closed pull request #5678: KAFKA-7415; Persist leader epoch and start 
offset on becoming a leader
URL: https://github.com/apache/kafka/pull/5678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2036bb09da8..307fb81447b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -301,8 +301,17 @@ class Partition(val topic: String,
   leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
   leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
   zkVersion = partitionStateInfo.basePartitionState.zkVersion
-  val isNewLeader = leaderReplicaIdOpt.map(_ != 
localBrokerId).getOrElse(true)
 
+  // In the case of successive leader elections in a short time period, a 
follower may have
+  // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+  // to ensure that these followers can truncate to the right offset, we 
must cache the new
+  // leader epoch and the start offset since it should be larger than any 
epoch that a follower
+  // would try to query.
+  leaderReplica.epochs.foreach { epochCache =>
+epochCache.assign(leaderEpoch, leaderEpochStartOffset)
+  }
+
+  val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId)
   val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
   val curTimeMs = time.milliseconds
   // initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index d729dadcb48..22860c71475 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,7 +18,7 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -55,7 +55,7 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
 
-  val epochs: Option[LeaderEpochCache] = log.map(_.leaderEpochCache)
+  val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache)
 
   info(s"Replica loaded for partition $topicPartition with initial high 
watermark $initialHighWatermarkValue")
   log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue))
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index afe151d69b6..4e335ccc33b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,15 +32,15 @@ import kafka.common.{LogSegmentOffsetOverflowException, 
LongRef, OffsetsOutOfOrd
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, 
NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
-import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, 
LogOffsetMetadata}
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.errors.{CorruptRecordException, 
InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, 
RecordBatchTooLargeException, RecordTooLargeException, 
UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -229,7 +229,7 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = 
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
-  @volatile private var _leaderEpochCache: LeaderEpochCache = 
initializeLeaderEpochCach

[jira] [Commented] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16624312#comment-16624312
 ] 

ASF GitHub Bot commented on KAFKA-7415:
---

hachikuji opened a new pull request #5678: KAFKA-7415; Persist leader epoch and 
start offset on becoming a leader
URL: https://github.com/apache/kafka/pull/5678
 
 
   This patch ensures that the leader epoch cache is updated when a broker 
becomes leader with the latest epoch and the log end offset as its starting 
offset. This guarantees that the leader will be able to provide the right 
truncation point even if the follower has data from leader epochs which the 
leader itself does not have. This situation can occur when there are back to 
back leader elections.
   
   Additionally, we have made the following changes:
   
   1. The leader epoch cache enforces monotonically increase epochs and 
starting offsets among its entry. Whenever a new entry is appended which 
violates requirement, we remove the conflicting entries from the cache.
   2. Previously we returned an unknown epoch and offset if an epoch is queried 
which comes before the first entry in the cache. Now we return the smallest . 
For example, if the earliest entry in the cache is (epoch=5, startOffset=10), 
then a query for epoch 4 will return (epoch=4, endOffset=10). This ensures that 
followers (and consumers in KIP-320) can always determine where the correct 
starting point is for the active log range on the leader.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing 
> truncation to HW
> ---
>
> Key: KAFKA-7415
> URL: https://issues.apache.org/jira/browse/KAFKA-7415
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 2.0.0
>Reporter: Anna Povzner
>Assignee: Jason Gustafson
>Priority: Major
>
> If the follower's last appended epoch is ahead of the leader's last appended 
> epoch, the OffsetsForLeaderEpoch response will incorrectly send 
> (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to 
> HW. This may lead to data loss in some rare cases where 2 back-to-back leader 
> elections happen (failure of one leader, followed by quick re-election of the 
> next leader due to preferred leader election, so that all replicas are still 
> in the ISR, and then failure of the 3rd leader).
> The bug is in LeaderEpochFileCache.endOffsetFor(), which returns 
> (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is 
> ahead of the last leader epoch in the cache. The method should return (last 
> leader epoch in the cache, LEO) in this scenario.
> We don't create an entry in a leader epoch cache until a message is appended 
> with the new leader epoch. Every append to log calls 
> LeaderEpochFileCache.assign(). However, it would be much cleaner if 
> `makeLeader` created an entry in the cache as soon as replica becomes a 
> leader, which will fix the bug. In case the leader never appends any 
> messages, and the next leader epoch starts with the same offset, we already 
> have clearAndFlushLatest() that clears entries with start offsets greater or 
> equal to the passed offset. LeaderEpochFileCache.assign() could be merged 
> with clearAndFlushLatest(), so that we clear cache entries with offsets equal 
> or greater than the start offset of the new epoch, so that we do not need to 
> call these methods separately. 
>  
> Here is an example of a scenario where the issue leads to the data loss.
> Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists 
> of (r1, r2, r3) and the leader is r1. The data up to offset 10 has been 
> committed to the ISR. Here is the initial state:
> {code:java}
> Leader: r1
> leader epoch: 0
> ISR(r1, r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=10]
> r3: [hw=5, leo=10]
> {code}
> Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with 
> leader epoch = 1. The leader appends a batch, but it is not replicated yet to 
> the followers.
> {code:java}
> Leader: r2
> leader epoch: 1
> ISR(r2, r3)
> r1: [hw=10, leo=10]
> r2: [hw=8, leo=11]
> r3: [hw=5, leo=10]
> {code}
> Replica 3 is elected a leader (due to preferred leader ele