junrao commented on code in PR #18852:
URL: https://github.com/apache/kafka/pull/18852#discussion_r1962522903


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1817,14 +1839,15 @@ private void appendAsFollower(
         partitionState.updateState();
 
         OffsetAndEpoch endOffset = endOffset();
-        kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - 
info.firstOffset + 1);
         kafkaRaftMetrics.updateLogEnd(endOffset);
         logger.trace("Follower end offset updated to {} after append", 
endOffset);
+
+        appendInfo.ifPresent(
+            info -> kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - 
info.firstOffset + 1)

Review Comment:
   Could we move this inside the try/catch where appendInfo is created? This 
avoids the need to make appendInfo an Optional.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1802,10 +1804,30 @@ private boolean handleFetchResponse(
         }
     }
 
-    private void appendAsFollower(
-        Records records
-    ) {
-        LogAppendInfo info = log.appendAsFollower(records);
+    private static String convertToHexadecimal(Records records) {
+        ByteBuffer buffer = ((MemoryRecords) records).buffer();
+        byte[] bytes = new byte[Math.min(buffer.remaining(), 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD)];
+        buffer.get(bytes);
+
+        return HexFormat.of().formatHex(bytes);
+    }
+
+    private void appendAsFollower(Records records) {
+        if (records.sizeInBytes() == 0) {
+            // Nothing to do if there are no bytes in the response
+            return;
+        }
+
+        Optional<LogAppendInfo> appendInfo = Optional.empty();
+        try {
+            appendInfo = Optional.of(log.appendAsFollower(records, 
quorum.epoch()));

Review Comment:
   `quorum.epoch()` could change between the fetch request is issued and the 
fetch response is received, right? If so, we need to use the epoch used when 
creating the fetch request.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -480,22 +481,36 @@ class PartitionTest extends AbstractPartitionTest {
     }
 
     partition.createLogIfNotExists(isNew = true, isFutureReplica = false, 
offsetCheckpoints, None)
+    var partitionState = new LeaderAndIsrRequest.PartitionState()
+      .setControllerEpoch(0)
+      .setLeader(2)
+      .setLeaderEpoch(prevLeaderEpoch)
+      .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
+      .setIsNew(false)
+    assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None))
 
     val appendThread = new Thread {
       override def run(): Unit = {
-        val records = createRecords(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes),
-          new SimpleRecord("k2".getBytes, "v2".getBytes)),
-          baseOffset = 0)
-        partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = 
false)
+        val records = createRecords(
+          List(
+            new SimpleRecord("k1".getBytes, "v1".getBytes),
+            new SimpleRecord("k2".getBytes, "v2".getBytes)
+          ),
+          baseOffset = 0,
+          partitionLeaderEpoch = prevLeaderEpoch
+        )
+        partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = 
false, prevLeaderEpoch)
       }
     }
     appendThread.start()
     TestUtils.waitUntilTrue(() => appendSemaphore.hasQueuedThreads, "follower 
log append is not called.")
 
-    val partitionState = new LeaderAndIsrRequest.PartitionState()
+    partitionState = new LeaderAndIsrRequest.PartitionState()
       .setControllerEpoch(0)
       .setLeader(2)
-      .setLeaderEpoch(1)
+      .setLeaderEpoch(prevLeaderEpoch + 1)

Review Comment:
   If we change leaderEpoch, the partition epoch should also change, right?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -1383,10 +1432,32 @@ class UnifiedLogTest {
     // this is a bit contrived. to trigger the duplicate case for a follower 
append, we have to append
     // a batch with matching sequence numbers, but valid increasing offsets
     assertEquals(0L, log.logEndOffset)
-    log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, 
Compression.NONE, pid, epoch, baseSequence,
-      partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
-    log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, 
Compression.NONE, pid, epoch, baseSequence,
-      partitionLeaderEpoch, new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)))
+    log.appendAsFollower(
+      MemoryRecords.withIdempotentRecords(
+        0L,
+        Compression.NONE,
+        pid,
+        epoch,

Review Comment:
   epoch => producerEpoch



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5177,9 +5177,12 @@ class ReplicaManagerTest {
         replicaManager.getPartition(topicPartition) match {
           case HostedPartition.Online(partition) =>
             partition.appendRecordsToFollowerOrFutureReplica(
-              records = MemoryRecords.withRecords(Compression.NONE, 0,
-                new SimpleRecord("first message".getBytes)),
-              isFuture = false
+              records = MemoryRecords.withRecords(
+                Compression.NONE, 0,
+                new SimpleRecord("first message".getBytes)
+              ),
+              isFuture = false,
+              partitionLeaderEpoch = Int.MaxValue

Review Comment:
   Should we just use 0 as the leader epoch?



##########
core/src/test/scala/unit/kafka/server/MockFetcherThread.scala:
##########
@@ -115,6 +125,11 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
       batches.headOption.map(_.lastOffset).getOrElse(-1)))
   }
 
+  private def hasInvalidPartitionLeaderEpoch(batch: RecordBatch, leaderEpoch: 
Int): Boolean = {

Review Comment:
   hasInvalidPartitionLeaderEpoch => hasHigherPartitionLeaderEpoch?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -1904,9 +1988,94 @@ class UnifiedLogTest {
 
     val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes = 
second.sizeInBytes - 1))
 
-    log.appendAsFollower(first)
+    log.appendAsFollower(first, Int.MaxValue)
     // the second record is larger then limit but appendAsFollower does not 
validate the size.
-    log.appendAsFollower(second)
+    log.appendAsFollower(second, Int.MaxValue)
+  }
+
+  @ParameterizedTest
+  @ArgumentsSource(classOf[InvalidMemoryRecordsProvider])
+  def testInvalidMemoryRecords(records: MemoryRecords, expectedException: 
Optional[Class[Exception]]): Unit = {
+    val logConfig = LogTestUtils.createLogConfig()
+    val log = createLog(logDir, logConfig)
+    val previousEndOffset = log.logEndOffsetMetadata.messageOffset
+
+    if (expectedException.isPresent()) {
+      assertThrows(
+        expectedException.get(),
+        () => log.appendAsFollower(records, Int.MaxValue)
+      );
+    } else {
+        log.appendAsFollower(records, Int.MaxValue)
+    }
+
+    assertEquals(previousEndOffset, log.logEndOffsetMetadata.messageOffset)
+  }
+
+  @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+  def testRandomRecords(
+    @ForAll(supplier = classOf[ArbitraryMemoryRecords]) records: MemoryRecords
+  ): Unit = {
+    val tempDir = TestUtils.tempDir()
+    val logDir = TestUtils.randomPartitionLogDir(tempDir)
+    try {
+      val logConfig = LogTestUtils.createLogConfig()
+      val log = createLog(logDir, logConfig)
+      val previousEndOffset = log.logEndOffsetMetadata.messageOffset
+
+      // Depedning on the random corruption, unified log sometimes throws and 
sometimes returns an

Review Comment:
   typo Depedning



##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -108,12 +118,93 @@ final class KafkaMetadataLogTest {
       classOf[RuntimeException],
       () => {
         log.appendAsFollower(
-          MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo)
+          MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo),
+          currentEpoch
         )
       }
     )
   }
 
+  @Test
+  def testEmptyAppendNotAllowed(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
log.appendAsFollower(MemoryRecords.EMPTY, 1));
+    assertThrows(classOf[IllegalArgumentException], () => 
log.appendAsLeader(MemoryRecords.EMPTY, 1));
+  }
+
+  @ParameterizedTest
+  @ArgumentsSource(classOf[InvalidMemoryRecordsProvider])
+  def testInvalidMemoryRecords(records: MemoryRecords, expectedException: 
Optional[Class[Exception]]): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+    val previousEndOffset = log.endOffset().offset()
+
+    val action: Executable = () => log.appendAsFollower(records, Int.MaxValue)
+    if (expectedException.isPresent()) {
+      assertThrows(expectedException.get, action)
+    } else {
+      assertThrows(classOf[CorruptRecordException], action)
+    }
+
+    assertEquals(previousEndOffset, log.endOffset().offset())
+  }
+
+  @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)

Review Comment:
   What's the behavior if we get some random bytes that actually form a valid 
batch?



##########
clients/src/test/java/org/apache/kafka/common/record/InvalidMemoryRecordsProvider.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public final class InvalidMemoryRecordsProvider implements ArgumentsProvider {
+    // Use a baseOffset that not zero so that is less likely to match the LEO
+    private static final long BASE_OFFSET = 1234;
+    private static final int EPOCH = 4321;
+
+    /** Returns a stream of arguements for invalid memory records and the 
expected exception.

Review Comment:
   typo arguements



##########
clients/src/test/java/org/apache/kafka/common/record/InvalidMemoryRecordsProvider.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public final class InvalidMemoryRecordsProvider implements ArgumentsProvider {
+    // Use a baseOffset that not zero so that is less likely to match the LEO

Review Comment:
   that not zero => that's not zero
   so that is less likely => so that it is less likely



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1159,6 +1180,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validBytesCount, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], LeaderHwChange.NONE)
   }
 
+  /**
+   * Return true if the record batch has a higher leader epoch than the 
specified leader epoch
+   *
+   * @param batch the batch to validate
+   * @param origin the reason for appending the record batch
+   * @param leaderEpoch the epoch to compare
+   * @return true if the append reason is replication and the batch's 
partition leader epoch is
+   *         greater than the leader epoch, otherwise false
+   */
+  private def hasInvalidPartitionLeaderEpoch(

Review Comment:
   hasInvalidPartitionLeaderEpoch => hasHigherPartitionLeaderEpoch ?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1086,63 +1088,82 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     var shallowOffsetOfMaxTimestamp = -1L
     var readFirstMessage = false
     var lastOffsetOfFirstBatch = -1L
+    var skipRemainingBatches = false
 
     records.batches.forEach { batch =>
       if (origin == AppendOrigin.RAFT_LEADER && batch.partitionLeaderEpoch != 
leaderEpoch) {
-        throw new InvalidRecordException("Append from Raft leader did not set 
the batch epoch correctly")
+        throw new InvalidRecordException(
+          s"Append from Raft leader did not set the batch epoch correctly, 
expected $leaderEpoch " +
+          s"but the batch has ${batch.partitionLeaderEpoch}"
+        )
       }
       // we only validate V2 and higher to avoid potential compatibility 
issues with older clients
-      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == 
AppendOrigin.CLIENT && batch.baseOffset != 0)
+      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == 
AppendOrigin.CLIENT && batch.baseOffset != 0) {
         throw new InvalidRecordException(s"The baseOffset of the record batch 
in the append to $topicPartition should " +
           s"be 0, but it is ${batch.baseOffset}")
-
-      // update the first offset if on the first message. For magic versions 
older than 2, we use the last offset
-      // to avoid the need to decompress the data (the last offset can be 
obtained directly from the wrapper message).
-      // For magic version 2, we can get the first offset directly from the 
batch header.
-      // When appending to the leader, we will update LogAppendInfo.baseOffset 
with the correct value. In the follower
-      // case, validation will be more lenient.
-      // Also indicate whether we have the accurate first offset or not
-      if (!readFirstMessage) {
-        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-          firstOffset = batch.baseOffset
-        lastOffsetOfFirstBatch = batch.lastOffset
-        readFirstMessage = true
       }
 
-      // check that offsets are monotonically increasing
-      if (lastOffset >= batch.lastOffset)
-        monotonic = false
-
-      // update the last offset seen
-      lastOffset = batch.lastOffset
-      lastLeaderEpoch = batch.partitionLeaderEpoch
-
-      // Check if the message sizes are valid.
-      val batchSize = batch.sizeInBytes
-      if (!ignoreRecordSize && batchSize > config.maxMessageSize) {
-        
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
-        
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
-        throw new RecordTooLargeException(s"The record batch size in the 
append to $topicPartition is $batchSize bytes " +
-          s"which exceeds the maximum configured value of 
${config.maxMessageSize}.")
-      }
+      /* During replication of uncommitted data it is possible for the remote 
replica to send record batches after it lost
+       * leadership. This can happen if sending FETCH responses is slow. There 
is a race between sending the FETCH
+       * response and the replica truncating and appending to the log. The 
replicating replica resolves this issue by only
+       * persisting up to the partition leader epoch of the leader when the 
FETCH request was handled. See KAFKA-18723 for

Review Comment:
   persisting up to the partition leader epoch of the leader when the FETCH 
request was handled => persisting up to the current leader epoch used in the 
fetch request



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to