[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597309028



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   @hachikuji made changes to reuse snapshotId variable. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597158051



##
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-final private Type type;
+final private Kind kind;

Review comment:
   Agree with existing design and limitations of interface compared to case 
classes. Java 15 has sealed classes which will be good alternative: 
https://openjdk.java.net/jeps/360 to interfaces as we will be limiting the 
extension. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   @hachikuji can you explain what is meant by using snapshotId? You mean 
changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())` to`assertEquals(new 
OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   ` and not use variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-13 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r593859477



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val offset = 2
+val epoch = 1
+
+append(log, offset, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+val snapshotId = new OffsetAndEpoch(offset, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val offset = 2
+val epoch = 1
+
+append(log, offset, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+val snapshotId = new OffsetAndEpoch(offset, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 10)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, 
epoch)

Review comment:
   Added testValidateOffsetLessThanLEO test.

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -594,6 +594,102 @@ public void testDoesntTruncateFully() throws IOException {
 assertFalse(log.truncateToLatestSnapshot());
 }
 
+@Test

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-13 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r593859421



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val offset = 2
+val epoch = 1
+
+append(log, offset, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+val snapshotId = new OffsetAndEpoch(offset, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val offset = 2
+val epoch = 1
+
+append(log, offset, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+val snapshotId = new OffsetAndEpoch(offset, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 10)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, 
epoch)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.`type`())
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateValidEpochAndOffset(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch)

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org