[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267137597


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -172,15 +176,17 @@ default void beginShutdown() {}
  * uncommitted entries after observing an epoch change.
  *
  * @param epoch the current leader epoch
+ * @param requiredEndOffset if this is set, it is the offset we must use 
as the end offset (inclusive).

Review Comment:
   ok. we can use `requiredBaseOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267142346


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -89,52 +91,12 @@ public BatchAccumulator(
 this.appendLock = new ReentrantLock();
 }
 
-/**
- * Append a list of records into as many batches as necessary.
- *
- * The order of the elements in the records argument will match the order 
in the batches.
- * This method will use as many batches as necessary to serialize all of 
the records. Since
- * this method can split the records into multiple batches it is possible 
that some of the
- * records will get committed while other will not when the leader fails.
- *
- * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
- *  will be thrown
- * @param records the list of records to include in the batches
- * @return the expected offset of the last record
- * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
- * batch size; if this exception is throw some of the elements in 
records may have
- * been committed
- * @throws NotLeaderException if the epoch is less than the leader epoch
- * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
- * @throws BufferAllocationException if we failed to allocate memory for 
the records
- * @throws IllegalStateException if we tried to append new records after 
the batch has been built
- */
-public long append(int epoch, List records) {
-return append(epoch, records, false);
-}
-
-/**
- * Append a list of records into an atomic batch. We guarantee all records 
are included in the
- * same underlying record batch so that either all of the records become 
committed or none of
- * them do.
- *
- * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
- *  will be thrown
- * @param records the list of records to include in a batch
- * @return the expected offset of the last record
- * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
- * batch size; if this exception is throw none of the elements in 
records were
- * committed
- * @throws NotLeaderException if the epoch is less than the leader epoch
- * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
- * @throws BufferAllocationException if we failed to allocate memory for 
the records
- * @throws IllegalStateException if we tried to append new records after 
the batch has been built
- */
-public long appendAtomic(int epoch, List records) {
-return append(epoch, records, true);
-}
-
-private long append(int epoch, List records, boolean isAtomic) {
+public long append(
+int epoch,
+List records,
+OptionalLong requiredEndOffset,
+boolean isAtomic
+) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > This is not correct in all cases. The leader can have records in the base 
accumulator that have not been sent to the log.
   
   I don't care about that, though, since I only plan on using this method when 
the leader becomes active. We will not use it after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -147,6 +126,13 @@ private long append(int epoch, List records, boolean 
isAtomic) {
 
 appendLock.lock();
 try {
+long endOffset = nextOffset + records.size() - 1;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -147,6 +126,13 @@ private long append(int epoch, List records, boolean 
isAtomic) {
 
 appendLock.lock();
 try {
+long endOffset = nextOffset + records.size() - 1;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > This is not correct in all cases. The leader can have records in the base 
accumulator that have not been sent to the log.
   
   I don't care about that, though, since I only plan on using this method when 
the leader becomes active. We will not use it after that. I also feel that a 
method named "logEndOffset" should just return the log end offset. Returning 
something else would be misleading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > This is not correct in all cases. The leader can have records in the base 
accumulator that have not been sent to the log.
   
   I feel that a method named "logEndOffset" should just return the log end 
offset. Returning something else would be misleading.
   
   In any case, I only plan on using this method when the leader becomes 
active. We will not use it after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-25 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > This is not correct in all cases. The leader can have records in the base 
accumulator that have not been sent to the log.
   
   Hmm... the method name is "logEndOffset." So it should just return the log 
end offset, right? Returning something else would be misleading.
   
   In any case, we only need this method when the leader becomes active. We 
will not use it after that.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > Can we also add tests for this new functionality?
   
   Yes, good point. I will add a test for the `requiredEndOffset` parameter and 
the `logEndOffset` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-26 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1275472446


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
 }
 }
 
-private void claim(int epoch) {
+private void claim(int epoch, long newLastWriteOffset) {
 try {
 if (curClaimEpoch != -1) {
 throw new RuntimeException("Cannot claim leadership because we 
are already the " +
 "active controller.");
 }
 curClaimEpoch = epoch;
 controllerMetrics.setActive(true);
-updateWriteOffset(lastCommittedOffset);
+updateWriteOffset(newLastWriteOffset);

Review Comment:
   Well, there is not really any difference between the previous iterations of 
this PR and the current one in this regard. If some other component that isn't 
the controller is adding messages, our supplied `requiredBaseOffset` may be 
invalid. It is only a snapshot of the offset at a point in time, after all. 
Which is why we check `requiredBaseOffset` in `atomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276690892


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -171,16 +172,21 @@ default void beginShutdown() {}
  * to resign its leadership. The state machine is expected to discard all
  * uncommitted entries after observing an epoch change.
  *
+ * If the current base offset does not match the supplied required base 
offset,
+ * then this method will throw {@link UnexpectedBaseOffsetException}.
+ *
  * @param epoch the current leader epoch
+ * @param requiredBaseOffset if this is set, it is the offset we must use 
as the base offset.
  * @param records the list of records to append
  * @return the expected offset of the last record if append succeed
  * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum
  * batch size; if this exception is throw none of the elements in 
records were
  * committed
  * @throws NotLeaderException if we are not the current leader or the 
epoch doesn't match the leader epoch
  * @throws BufferAllocationException we failed to allocate memory for the 
records
+ * @throws UnexpectedBaseOffsetException the requested base offset could 
not be obtained.
  */
-long scheduleAtomicAppend(int epoch, List records);
+long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, 
List records);

Review Comment:
   The controller doesn't use `scheduleAppend`. We only use 
`scheduleAtomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org