hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r564989172
########## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ########## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], + leaderEpoch = epoch, + origin = AppendOrigin.Coordinator + ) + ) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") - val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) - new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") - }, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { + appendInfo.firstOffset match { + case Some(firstOffset) => + if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() + } + new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => + throw new KafkaException(s"Append failed unexpectedly: $appendInfo") + } } override def lastFetchedEpoch: Int = { - log.latestEpoch.getOrElse(0) + log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => + val logEndOffset = endOffset().offset + if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch + } else { + throw new KafkaException( + s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + + s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + + s"and the log start offset ($startOffset)" + ) + } + }.orElse(0) + } } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && + offsetAndEpoch.offset == oldestSnapshotId.get().offset && + offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: Always tough to think my way through this logic. So here we are handling the case where the client has requested the end offset for an epoch which is smaller than the entries remaining in the log. The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the log start offset, which we assume to be equal to the offset corresponding to the oldest snapshot. Right so far? So there are still a couple cases that are possible after verifying this: 1. The requested epoch matches the snapshot epoch. We know then that the end offset must be equal to the snapshot offset. This is a case where there is a new epoch which begins on the first offset in the log. 2. The requested epoch is less than the snapshot epoch. The follower might have been offline for a long time and is requesting an ancient epoch which we no longer have any knowledge of. I'm convinced about the handling for the first case, but less sure about the second one. The guarantee that this API provides is that it will return the largest epoch which is less than or equal to the requested epoch and its last offset. But for this case, the snapshot epoch is larger than the request epoch. In fact, we have no way of satisfying this API because we have lost the old data. Thinking in terms of what we want to happen might be helpful. We want to detect this as an out of range fetch which causes the follower to fetch from the latest snapshot. What if we return `Optional.empty` since we cannot determine the end offset for the requested epoch? Currently we do the following in `KafkaRaftClient`: ```java OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> { return new IllegalStateException( String.format( "Expected to find an end offset for epoch %s since it must be less than the current epoch %s", lastFetchedEpoch, quorum.epoch() ) ); }); ``` There are two cases we want to handle: the requested epoch is less than any known epoch we have and the requested epoch is larger than any known epoch. The latter case might be possible if there was an "unclean" election of some kind. However, we can handle both cases the same. We can send the latest snapshot id to the follower, which will cause it to truncate. The follower may or may not see this as a loss of committed data and fail, but that is ok. At least we will get a clear message about the failure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org