hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564989172
##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
if (records.sizeInBytes == 0)
throw new IllegalArgumentException("Attempt to append an empty record
set")
- val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
- leaderEpoch = epoch,
- origin = AppendOrigin.Coordinator)
- new LogAppendInfo(appendInfo.firstOffset.getOrElse {
- throw new KafkaException("Append failed unexpectedly")
- }, appendInfo.lastOffset)
+ handleAndConvertLogAppendInfo(
+ log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+ leaderEpoch = epoch,
+ origin = AppendOrigin.Coordinator
+ )
+ )
}
override def appendAsFollower(records: Records): LogAppendInfo = {
if (records.sizeInBytes == 0)
throw new IllegalArgumentException("Attempt to append an empty record
set")
- val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
- new LogAppendInfo(appendInfo.firstOffset.getOrElse {
- throw new KafkaException("Append failed unexpectedly")
- }, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+ }
+
+ private def handleAndConvertLogAppendInfo(appendInfo:
kafka.log.LogAppendInfo): LogAppendInfo = {
+ appendInfo.firstOffset match {
+ case Some(firstOffset) =>
+ if (firstOffset.relativePositionInSegment == 0) {
+ // Assume that a new segment was created if the relative position is 0
+ log.deleteOldSegments()
+ }
+ new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+ case None =>
+ throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+ }
}
override def lastFetchedEpoch: Int = {
- log.latestEpoch.getOrElse(0)
+ log.latestEpoch.getOrElse {
+ latestSnapshotId.map { snapshotId =>
+ val logEndOffset = endOffset().offset
+ if (snapshotId.offset == startOffset && snapshotId.offset ==
logEndOffset) {
+ // Return the epoch of the snapshot when the log is empty
+ snapshotId.epoch
+ } else {
+ throw new KafkaException(
+ s"Log doesn't have a last fetch epoch and there is a snapshot
($snapshotId). " +
+ s"Expected the snapshot's end offset to match the log's end offset
($logEndOffset) " +
+ s"and the log start offset ($startOffset)"
+ )
+ }
+ }.orElse(0)
+ }
}
override def endOffsetForEpoch(leaderEpoch: Int):
Optional[raft.OffsetAndEpoch] = {
val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch
=>
- new raft.OffsetAndEpoch(offsetAndEpoch.offset,
offsetAndEpoch.leaderEpoch)
+ if (oldestSnapshotId.isPresent() &&
+ offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+ offsetAndEpoch.leaderEpoch == leaderEpoch) {
Review comment:
Always tough to think my way through this logic. So here we are handling
the case where the client has requested the end offset for an epoch which is
smaller than the entries remaining in the log. The current epoch cache
implementation handles this by returning the requested epoch with an end offset
equal to the log start offset. So we detect the case here by checking that the
returned epoch matches the requested epoch and the end offset matches the log
start offset, which we assume to be equal to the offset corresponding to the
oldest snapshot. Right so far?
So there are still a couple cases that are possible after verifying this:
1. The requested epoch matches the snapshot epoch. We know then that the end
offset of this epoch must be equal to the snapshot offset. This is a case where
there is a new epoch which begins on the first offset in the log.
2. The requested epoch is less than the snapshot epoch. The follower might
have been offline for a long time and is requesting an ancient epoch which we
no longer have any knowledge of.
I'm convinced about the handling for the first case, but less sure about the
second one. The guarantee that this API provides is that it will return the
largest epoch which is less than or equal to the requested epoch and its last
offset. But for this case, the snapshot epoch is larger than the request epoch.
In fact, we have no way of satisfying this API because we have lost the old
data.
Thinking in terms of what we want to happen might be helpful. We want to
detect this as an out of range fetch which causes the follower to fetch from
the latest snapshot. What if we return `Optional.empty` since we cannot
determine the end offset for the requested epoch? Currently we do the following
in `KafkaRaftClient`:
```java
OffsetAndEpoch endOffsetAndEpoch =
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
return new IllegalStateException(
String.format(
"Expected to find an end offset for epoch %s since it
must be less than the current epoch %s",
lastFetchedEpoch,
quorum.epoch()
)
);
});
```
There are two cases we want to handle: the requested epoch is less than any
known epoch we have and the requested epoch is larger than any known epoch. The
latter case might be possible if there was an "unclean" election of some kind.
However, we can handle both cases the same. We can send the latest snapshot id
to the follower, which will cause it to truncate. The follower may or may not
see this as a loss of committed data and fail, but that is ok. At least we will
get a clear message about the failure.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]