jsancio commented on code in PR #18852:
URL: https://github.com/apache/kafka/pull/18852#discussion_r1968145487
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1086,63 +1088,79 @@ class UnifiedLog(@volatile var logStartOffset: Long,
var shallowOffsetOfMaxTimestamp = -1L
var readFirstMessage = false
var lastOffsetOfFirstBatch = -1L
+ var skipRemainingBatches = false
records.batches.forEach { batch =>
if (origin == AppendOrigin.RAFT_LEADER && batch.partitionLeaderEpoch !=
leaderEpoch) {
throw new InvalidRecordException("Append from Raft leader did not set
the batch epoch correctly")
}
// we only validate V2 and higher to avoid potential compatibility
issues with older clients
- if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin ==
AppendOrigin.CLIENT && batch.baseOffset != 0)
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin ==
AppendOrigin.CLIENT && batch.baseOffset != 0) {
throw new InvalidRecordException(s"The baseOffset of the record batch
in the append to $topicPartition should " +
s"be 0, but it is ${batch.baseOffset}")
-
- // update the first offset if on the first message. For magic versions
older than 2, we use the last offset
- // to avoid the need to decompress the data (the last offset can be
obtained directly from the wrapper message).
- // For magic version 2, we can get the first offset directly from the
batch header.
- // When appending to the leader, we will update LogAppendInfo.baseOffset
with the correct value. In the follower
- // case, validation will be more lenient.
- // Also indicate whether we have the accurate first offset or not
- if (!readFirstMessage) {
- if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
- firstOffset = batch.baseOffset
- lastOffsetOfFirstBatch = batch.lastOffset
- readFirstMessage = true
}
- // check that offsets are monotonically increasing
- if (lastOffset >= batch.lastOffset)
- monotonic = false
-
- // update the last offset seen
- lastOffset = batch.lastOffset
- lastLeaderEpoch = batch.partitionLeaderEpoch
-
- // Check if the message sizes are valid.
- val batchSize = batch.sizeInBytes
- if (!ignoreRecordSize && batchSize > config.maxMessageSize) {
-
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
- throw new RecordTooLargeException(s"The record batch size in the
append to $topicPartition is $batchSize bytes " +
- s"which exceeds the maximum configured value of
${config.maxMessageSize}.")
- }
+ /* During replication of uncommitted data it is possible for the remote
replica to send record batches after it lost
+ * leadership. This can happend if sending FETCH responses is slowed
because there is a race between sending the FETCH
+ * response and the replica truncating and appending to the log. The
replicating replica resolves this issue by only
+ * persisting up to the partition leader epoch of the leader when the
FETCH request was handled. See KAFKA-18723 for
+ * more details.
+ */
+ skipRemainingBatches = skipRemainingBatches ||
hasInvalidPartitionLeaderEpoch(batch, origin, leaderEpoch);
+ if (skipRemainingBatches) {
+ info(s"Skipping batch $batch because origin is $origin and leader
epoch is $leaderEpoch")
Review Comment:
I agreed. Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]