[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651378054



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   Yeah, makes sense. I was sort of considering if it would be useful to 
have a `SnapshotId` object. Currently we use `OffsetAndEpoch` in other cases, 
but maybe a separate object would let us have better names. It would also let 
us define inclusive and exclusive methods.
   





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-14 Thread GitBox


hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
 return snapshot.snapshotId();
 }
 
+/**
+ * Returns the last log offset which is represented in the snapshot.
+ */
+public long lastOffsetFromLog() {

Review comment:
   Maybe something like `lastIncludedOffset` or `lastContainedOffset`?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
 appendBatch(numberOfRecords, epoch);
 log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, 
true).get()) {
 snapshot.freeze();
 }
 
 RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
 assertEquals(0, snapshot.sizeInBytes());
 }
 
+@Test
+public void testCreateSnapshotValidation() {
+int numberOfRecords = 10;
+int firstEpoch = 1;
+int secondEpoch = 3;
+
+appendBatch(numberOfRecords, firstEpoch);
+appendBatch(numberOfRecords, secondEpoch);
+log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+// Test snapshot id for the first epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+// Test snapshot id for the second epoch
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+}
+
+@Test
+public void testCreateSnapshotLaterThanHighWatermark() {
+int numberOfRecords = 10;
+int epoch = 1;
+
+appendBatch(numberOfRecords, epoch);
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+assertThrows(
+IllegalArgumentException.class,
+() -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
epoch), true)
+);
+}
+
+@Test
+public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
   Worth adding any test cases for an invalid epoch?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -180,14 +181,17 @@ default void beginShutdown() {}
 void resign(int epoch);
 
 /**
- * Create a writable snapshot file for a given offset and epoch.
+ * Create a writable snapshot file for a commmitted offset.

Review comment:
   nit: one extra 'm'

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1101,7 +1101,7 @@ private boolean handleFetchResponse(
 partitionResponse.snapshotId().epoch()
 );
 
-
state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId)));
+state.setFetchingSnapshot(log.createSnapshot(snapshotId, 
false));

Review comment:
   Might be worth a brief comment that the snapshot is expected to be well 
ahead of the current log, so we have to skip validation.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
  * Create a writable snapshot for the given snapshot id.
  *
  * See {@link RawSnapshotWriter} for details on how to use this object. 
The caller of
- * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}.
+ * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}. If a
+ * snapshot already exists then return an {@link Optional#empty()}.
  *
  * @param snapshotId the end offset and epoch that identifies the snapshot
- * @return a writable snapshot
+ * @param validate validate the snapshot id against the log
+ * @return a writable snapshot if it doesn't already exists
+ * @throws IllegalArgumentException if validate is true and end offset is 
greater than the
+ * high-watermark
+ * @throws IllegalArgumentException if validate is true and end offset is 

[GitHub] [kafka] hachikuji commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-08 Thread GitBox


hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r647803084



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##
@@ -74,56 +74,49 @@ String name() {
 this.batch = null;
 this.section = null;
 this.numRecords = 0;
-this.numWriteTries = 0;
 }
 
 /**
  * Returns the epoch of the snapshot that we are generating.
  */
 long epoch() {
-return writer.epoch();
+return writer.lastOffset();

Review comment:
   Is this correct? Seems likely to cause confusion if it is.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext,
 snapshotRegistry, sessionTimeoutNs, replicaPlacer);
 this.featureControl = new FeatureControlManager(supportedFeatures, 
snapshotRegistry);
 this.producerIdControlManager = new 
ProducerIdControlManager(clusterControl, snapshotRegistry);
-this.snapshotGeneratorManager = new 
SnapshotGeneratorManager(snapshotWriterBuilder);
+this.snapshotGeneratorManager = new 
SnapshotGeneratorManager(raftClient::createSnapshot);

Review comment:
   Passing through the function is a tad odd. We actually could just use 
the implicit reference to `raftClient`. Was this done for testing or something?

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -233,18 +233,40 @@ final class KafkaMetadataLog private (
 log.topicId.get
   }
 
-  override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = 
{
-// Do not let the state machine create snapshots older than the latest 
snapshot
-latestSnapshotId().ifPresent { latest =>
-  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
-// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
-throw new IllegalArgumentException(
-  s"Attempting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
-)
-  }
+  override def createSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotWriter] = {
+if (snapshots.contains(snapshotId)) {
+  Optional.empty()
+} else {
+  Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, 
Optional.of(this)))
+}
+  }
+
+  override def createSnapshotFromEndOffset(endOffset: Long): 
Optional[RawSnapshotWriter] = {
+val highWatermarkOffset = highWatermark.offset
+if (endOffset > highWatermarkOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) greater than 
the high-watermark ($highWatermarkOffset)"
+  )
+}
+
+if (endOffset < startOffset) {
+  throw new IllegalArgumentException(
+s"Cannot create a snapshot for an end offset ($endOffset) less than 
the log start offset ($startOffset)"
+  )
+}
+
+val epoch = 
log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match {
+  case Some(epochEntry) =>
+epochEntry.epoch
+  case None =>
+// Assume that the end offset falls in the current epoch since based 
on the check above:

Review comment:
   This confuses me a little bit. The logic in `findEpochEntryByEndOffset` 
returns the first epoch which has a start offset less than the end offset. 
Wouldn't that already cover the case of the current epoch? It seems like the 
case that is uncovered is when the offset is smaller than the start offset of 
the first cached epoch, but that should be an error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org