jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r594509680



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -732,13 +760,17 @@ public int hashCode() {
     }
 
     private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
+        appendAsLeader(records, epoch, log.endOffset().offset);
+    }
+
+    private void appendAsLeader(Collection<SimpleRecord> records, int epoch, 
long initialOffset) {
         log.appendAsLeader(
-            MemoryRecords.withRecords(
-                log.endOffset().offset,
-                CompressionType.NONE,
-                records.toArray(new SimpleRecord[records.size()])
-            ),
-            epoch
+                MemoryRecords.withRecords(
+                        initialOffset,
+                        CompressionType.NONE,
+                        records.toArray(new SimpleRecord[records.size()])
+                ),
+                epoch

Review comment:
       Indentation looks off. We indent 4 spaces.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
     assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-    assertEquals(ValidOffsetAndEpoch.Type.VALID, 
resultOffsetAndEpoch.getType())
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
     assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): 
Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
     val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
 
-    val numberOfRecords = 1
-    val epoch = 1
 
-    append(log, numberOfRecords, epoch)
+    append(log, numOfRecords, epoch = 1, initialOffset = 10)
+    append(log, numOfRecords, epoch = 2, initialOffset = 15)
+    append(log, numOfRecords, epoch = 4, initialOffset = 20)
 
-    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 10)
-    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.getType())
-    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+    // offset is not equal to oldest snapshot's offset
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.getType)
+    assertEquals(new OffsetAndEpoch(20, 2), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateUnknownEpochLessThanLeaderGreaterThanOldestSnapshot(): Unit 
= {

Review comment:
       How about `testValidateEpochLessThanFirstEpochInLog`? If you agree, 
let's change it in `MockLogTest` also.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -25,7 +27,7 @@
         this.offsetAndEpoch = offsetAndEpoch;
     }
 
-    public Type type() {
+    public Type getType() {

Review comment:
       By the way we can also just change the name of the type and field to 
something like `public Kind kind()`

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -750,4 +782,13 @@ private void appendBatch(int numRecords, int epoch) {
 
         appendAsLeader(records, epoch);
     }
+
+    private void appendBatch(int numRecords, int epoch, long initialOffset) {

Review comment:
       How about `private void appendBatch(int numberOfRecords, int epoch)` and 
always use the LEO like `appendAsLeader(Collection<SimpleRecord> records, int 
epoch)`?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
     assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-    assertEquals(ValidOffsetAndEpoch.Type.VALID, 
resultOffsetAndEpoch.getType())
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
     assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): 
Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
     val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
 
-    val numberOfRecords = 1
-    val epoch = 1
 
-    append(log, numberOfRecords, epoch)
+    append(log, numOfRecords, epoch = 1, initialOffset = 10)

Review comment:
       I think we should just change the signature to `def append(log: 
ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo`. The 
implementation of `append` should just use the value in `log.endOffset.offset` 
to set the batch's base offset.
   
   For a little bit of background the semantic for 
`ReplicatedLog::appendAsLeader` was recently changed from setting the right 
base offset to expecting the right base offset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to