jolshan opened a new pull request #9590: URL: https://github.com/apache/kafka/pull/9590
The current behavior for KafkaConsumer.beginningOffsets on compacted topics is to always return offset 0. However, when looking at the record with the earliest timestamp in the log with KafkaConsumer.offsetsForTimes, the offset it nonzero. This PR seeks to correct this issue by updating the segment baseOffsets and the logStartOffset during compaction. The segment offset will be the offset of the first non-compacted record, or the original baseOffset if the segment is empty. There are a few key changes/decisions to note that may not be immediately apparent: 1. **Producer Snapshot Behavior** Currently the behavior of producer snapshots is that during compaction, if the segment offset of the producer snapshot file is not in the new segment offsets, the snapshot is deleted. This will lead to behavior like the following: - So for example we have segments with offsets 0 and 1, we want to delete first segment New segment is base offset 0 so snapshot for segment 1 deleted. One segment, no snapshot files Under this PR, the new behavior is this - We have segments with offsets 0 and 1, we want to delete first segment New segment is base offset 1 so we don't delete snapshot. One segment, one snapshot file I was not sure if this is behavior we wanted, so I'm pointing it out. 2. **High Watermark Compaction** I wasn't sure if we allowed compaction past the high watermark. It seems like there is not a check to prevent it. If we want to update the log start offset, we need to add a special condition that allows segment compaction to update the log start offset past the high watermark, so I've done that for this PR. 3. **Update to firstOverflowSegment** I noticed that the check for overflow is different than the check we use in code. I switched over to the one we use in code since the old version will lead to failed tests. There is a scenario where the first record in a record batch is compacted. This will lead to the segment baseOffset being greater than batch.baseOffset. This would cause a test to fail unnecessarily. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org