jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
if (records.sizeInBytes == 0)
throw new IllegalArgumentException("Attempt to append an empty record
set")
- val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
- leaderEpoch = epoch,
- origin = AppendOrigin.Coordinator)
- new LogAppendInfo(appendInfo.firstOffset.getOrElse {
- throw new KafkaException("Append failed unexpectedly")
- }, appendInfo.lastOffset)
+ handleAndConvertLogAppendInfo(
+ log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+ leaderEpoch = epoch,
+ origin = AppendOrigin.Coordinator
+ )
+ )
}
override def appendAsFollower(records: Records): LogAppendInfo = {
if (records.sizeInBytes == 0)
throw new IllegalArgumentException("Attempt to append an empty record
set")
- val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
- new LogAppendInfo(appendInfo.firstOffset.getOrElse {
- throw new KafkaException("Append failed unexpectedly")
- }, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+ }
+
+ private def handleAndConvertLogAppendInfo(appendInfo:
kafka.log.LogAppendInfo): LogAppendInfo = {
+ appendInfo.firstOffset match {
+ case Some(firstOffset) =>
+ if (firstOffset.relativePositionInSegment == 0) {
+ // Assume that a new segment was created if the relative position is 0
+ log.deleteOldSegments()
+ }
+ new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+ case None =>
+ throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+ }
}
override def lastFetchedEpoch: Int = {
- log.latestEpoch.getOrElse(0)
+ log.latestEpoch.getOrElse {
+ latestSnapshotId.map { snapshotId =>
+ val logEndOffset = endOffset().offset
+ if (snapshotId.offset == startOffset && snapshotId.offset ==
logEndOffset) {
+ // Return the epoch of the snapshot when the log is empty
+ snapshotId.epoch
+ } else {
+ throw new KafkaException(
+ s"Log doesn't have a last fetch epoch and there is a snapshot
($snapshotId). " +
+ s"Expected the snapshot's end offset to match the log's end offset
($logEndOffset) " +
+ s"and the log start offset ($startOffset)"
+ )
+ }
+ }.orElse(0)
+ }
}
override def endOffsetForEpoch(leaderEpoch: Int):
Optional[raft.OffsetAndEpoch] = {
val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch
=>
- new raft.OffsetAndEpoch(offsetAndEpoch.offset,
offsetAndEpoch.leaderEpoch)
+ if (oldestSnapshotId.isPresent() &&
+ offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+ offsetAndEpoch.leaderEpoch == leaderEpoch) {
Review comment:
First, thanks a lot for thinking through this code and provide such
detail comment. This code is important to get right.
> the requested epoch is larger than any known epoch.
For this case I decided to throw an exception because the Fetch request
handling code already checks for this condition and returns an error Fetch
response. The leader returns an error Fetch response when this is invariant is
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other
words based on the current implementation, I think it is a bug if
`endOffsetForEpoch` returns `Optional.empty()`.
1.
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
2.
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
> the requested epoch is less than any known epoch we have
When thinking though this case I convinced myself. That the leader can
determine if it should send a snapshot simply by comparing "fetch offset" and
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is
the snapshot with an end offset equal to the log start offset.
> The current epoch cache implementation handles this by returning the
requested epoch with an end offset equal to the log start offset. So we detect
the case here by checking that the returned epoch matches the requested epoch
and the end offset matches the offset corresponding to the oldest snapshot,
which should be the same as the log start offset. Right so far?
Correct. My comment here assumes that the fetch offset is between the log
start offset and log end offset, and that the sending a snapshot is not
required. When thinking through the code in `ReplicatedLog::endOffsetForEpoch`,
I always interpreted as trying to find the largest offset at which it the
follower could have diverged. I would argue that the lower bound for this would
be the `oldestSnapshotId` hence the modification if the leader cache couldn't
find the end offset for the given epoch.
> However, we can handle both cases the same. We can send the latest
snapshot id to the follower, which will cause it to truncate.
Regarding the "last fetched epoch" > `quorum.epoch()` case:
Even though this cannot happen based on my earlier comment, why not return
`ValidatedFetchOffsetAndEpoch.diverging(logEndOffset, quorum.epoch())`? This is
much easier for me to reason about and I think it will have the same effect on
the follower. The follower will truncate to the end offset where the largest
local epoch <= the leader epoch?
For example, Sending `largestSnapshotId` will not cause the follower to
`maybeTruncateFullyToLatestSnapshot` because `largestSnapshotId.epoch <
log.latestEpoch` on the follower.
I need to think about it some more but I think the reality is that
`KafkaRaftClient` doesn't handle the case very well when quorum.epoch() is not
strictly increasing after a leader change.
### Proposed Changes
I think we can make this code much easier to understand if we remove
`ReplicatedLog::endOffsetForEpoch` and instead add a method to `ReplicatedLog`
with the following signature `ValidatedFetchOffsetAndEpoch
validateOffsetAndEpoch(long offset, int epoch)`. We can move the code in
`KafkaRaftClient::validateFetchOffsetAndEpoch` to this new method. I'll also
rename `ValidatedFetchOffsetAndEpoch` to `ValidOffsetAndEpoch` but let me know
if you have a better name. :)
```java
/**
* Validate the given offset and epoch against the log and oldest
snapshot.
*
* Returns the largest valid offset and epoch given `offset` and
`epoch` as the upper bound.
* This can result in three possible values returned:
*
* 1. ValidatedOffsetAndEpoch.valid if the given offset and epoch is
valid in the log.
*
* 2. ValidatedOffsetAndEpoch.diverging if the given offset and epoch
is not valid; and the
* largest valid offset and epoch is in the log.
*
* 3. ValidatedOffsetAndEpoch.snapshot if the given offset and epoch
is not valid; and the largest
* valid offset and epoch is less than the oldest snapshot.
*
* @param offset the offset to validate
* @param epoch the epoch of the record at offset - 1
* @return the largest valid offset and epoch
*/
ValidOffsetAndEpoch validateOffsetAndEpoch(long offset, int epoch);
```
I am liking this design because it would also allow us to move some of the
tests from `KafkaRaftClientSnapshotTest` to `KafkaMetadataLogTest`. Tests
against `KafkaMetadataLogTest` are much easier to write and read. This design
will also require the code to handle all possible values for `offset` and
`epoch` since it is in `ReplicatedLog` and we cannot assume a specific
implementation of `RaftClient`.
What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]