junrao commented on code in PR #18852:
URL: https://github.com/apache/kafka/pull/18852#discussion_r1955301937


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1301,27 +1301,35 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Option[LogAppendInfo] = {
+  private def doAppendRecordsToFollowerOrFutureReplica(
+    records: MemoryRecords,
+    isFuture: Boolean,
+    maxEpoch: Int
+  ): Option[LogAppendInfo] = {
     if (isFuture) {
       // The read lock is needed to handle race condition if request handler 
thread tries to
       // remove future replica after receiving AlterReplicaLogDirsRequest.
       inReadLock(leaderIsrUpdateLock) {
         // Note the replica may be undefined if it is removed by a 
non-ReplicaAlterLogDirsThread before
         // this method is called
-        futureLog.map { _.appendAsFollower(records) }
+        futureLog.map { _.appendAsFollower(records, maxEpoch) }
       }
     } else {
       // The lock is needed to prevent the follower replica from being updated 
while ReplicaAlterDirThread
       // is executing maybeReplaceCurrentWithFutureReplica() to replace 
follower replica with the future replica.
       futureLogLock.synchronized {
-        Some(localLogOrException.appendAsFollower(records))
+        Some(localLogOrException.appendAsFollower(records, maxEpoch))
       }
     }
   }
 
-  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: 
Boolean): Option[LogAppendInfo] = {
+  def appendRecordsToFollowerOrFutureReplica(
+    records: MemoryRecords,
+    isFuture: Boolean,
+    maxEpoch: Int

Review Comment:
   maxEpoch => leaderEpochForReplica?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1086,63 +1088,79 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     var shallowOffsetOfMaxTimestamp = -1L
     var readFirstMessage = false
     var lastOffsetOfFirstBatch = -1L
+    var skipRemainingBatches = false
 
     records.batches.forEach { batch =>
       if (origin == AppendOrigin.RAFT_LEADER && batch.partitionLeaderEpoch != 
leaderEpoch) {
         throw new InvalidRecordException("Append from Raft leader did not set 
the batch epoch correctly")
       }
       // we only validate V2 and higher to avoid potential compatibility 
issues with older clients
-      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == 
AppendOrigin.CLIENT && batch.baseOffset != 0)
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == 
AppendOrigin.CLIENT && batch.baseOffset != 0) {
         throw new InvalidRecordException(s"The baseOffset of the record batch 
in the append to $topicPartition should " +
           s"be 0, but it is ${batch.baseOffset}")
-
-      // update the first offset if on the first message. For magic versions 
older than 2, we use the last offset
-      // to avoid the need to decompress the data (the last offset can be 
obtained directly from the wrapper message).
-      // For magic version 2, we can get the first offset directly from the 
batch header.
-      // When appending to the leader, we will update LogAppendInfo.baseOffset 
with the correct value. In the follower
-      // case, validation will be more lenient.
-      // Also indicate whether we have the accurate first offset or not
-      if (!readFirstMessage) {
-        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          firstOffset = batch.baseOffset
-        lastOffsetOfFirstBatch = batch.lastOffset
-        readFirstMessage = true
       }
 
-      // check that offsets are monotonically increasing
-      if (lastOffset >= batch.lastOffset)
-        monotonic = false
-
-      // update the last offset seen
-      lastOffset = batch.lastOffset
-      lastLeaderEpoch = batch.partitionLeaderEpoch
-
-      // Check if the message sizes are valid.
-      val batchSize = batch.sizeInBytes
-      if (!ignoreRecordSize && batchSize > config.maxMessageSize) {
-        
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-        
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-        throw new RecordTooLargeException(s"The record batch size in the 
append to $topicPartition is $batchSize bytes " +
-          s"which exceeds the maximum configured value of 
${config.maxMessageSize}.")
-      }
+      /* During replication of uncommitted data it is possible for the remote 
replica to send record batches after it lost
+       * leadership. This can happend if sending FETCH responses is slowed 
because there is a race between sending the FETCH

Review Comment:
   typo happend



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1159,6 +1177,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validBytesCount, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], LeaderHwChange.NONE)
   }
 
+  /**
+   * Return true if the record batch should not be appending to the log.

Review Comment:
   Return true if the record batch should not be appending to the log => Return 
true if the record batch has a higher leader epoch than the replica?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1786,8 +1788,16 @@ private boolean handleFetchResponse(
                 }
             } else {
                 Records records = 
FetchResponse.recordsOrFail(partitionResponse);
-                if (records.sizeInBytes() > 0) {
+                try {
+                    // TODO: make sure to test corrupted records in kafka 
metadata log
                     appendAsFollower(records);
+                } catch (CorruptRecordException | InvalidRecordException e) {
+                    // TODO: this should log up to 265 bytes from the records

Review Comment:
   Is this done yet?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1786,8 +1788,16 @@ private boolean handleFetchResponse(
                 }
             } else {
                 Records records = 
FetchResponse.recordsOrFail(partitionResponse);
-                if (records.sizeInBytes() > 0) {
+                try {
+                    // TODO: make sure to test corrupted records in kafka 
metadata log

Review Comment:
   Should this be removed?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -333,7 +336,9 @@ abstract class AbstractFetcherThread(name: String,
             // In this case, we only want to process the fetch response if the 
partition state is ready for fetch and
             // the current offset is the same as the offset requested.
             val fetchPartitionData = sessionPartitions.get(topicPartition)
-            if (fetchPartitionData != null && fetchPartitionData.fetchOffset 
== currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
+            if (fetchPartitionData != null &&

Review Comment:
   It's possible that the fetch response is for an old leader epoch. It would 
be useful to further validate if the leader epoch in the fetch request matches 
the leader epoch in the current fetch state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to