[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1267137597 ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -172,15 +176,17 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch + * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive). Review Comment: ok. we can use `requiredBaseOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1267142346 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -89,52 +91,12 @@ public BatchAccumulator( this.appendLock = new ReentrantLock(); } -/** - * Append a list of records into as many batches as necessary. - * - * The order of the elements in the records argument will match the order in the batches. - * This method will use as many batches as necessary to serialize all of the records. Since - * this method can split the records into multiple batches it is possible that some of the - * records will get committed while other will not when the leader fails. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in the batches - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum - * batch size; if this exception is throw some of the elements in records may have - * been committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ -public long append(int epoch, List records) { -return append(epoch, records, false); -} - -/** - * Append a list of records into an atomic batch. We guarantee all records are included in the - * same underlying record batch so that either all of the records become committed or none of - * them do. - * - * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException} - * will be thrown - * @param records the list of records to include in a batch - * @return the expected offset of the last record - * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum - * batch size; if this exception is throw none of the elements in records were - * committed - * @throws NotLeaderException if the epoch is less than the leader epoch - * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch) - * @throws BufferAllocationException if we failed to allocate memory for the records - * @throws IllegalStateException if we tried to append new records after the batch has been built - */ -public long appendAtomic(int epoch, List records) { -return append(epoch, records, true); -} - -private long append(int epoch, List records, boolean isAtomic) { +public long append( +int epoch, +List records, +OptionalLong requiredEndOffset, +boolean isAtomic +) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I don't care about that, though, since I only plan on using this method when the leader becomes active. We will not use it after that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -147,6 +126,13 @@ private long append(int epoch, List records, boolean isAtomic) { appendLock.lock(); try { +long endOffset = nextOffset + records.size() - 1; Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -147,6 +126,13 @@ private long append(int epoch, List records, boolean isAtomic) { appendLock.lock(); try { +long endOffset = nextOffset + records.size() - 1; Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I don't care about that, though, since I only plan on using this method when the leader becomes active. We will not use it after that. I also feel that a method named "logEndOffset" should just return the log end offset. Returning something else would be misleading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I feel that a method named "logEndOffset" should just return the log end offset. Returning something else would be misleading. In any case, I only plan on using this method when the leader becomes active. We will not use it after that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading. In any case, we only need this method when the leader becomes active. We will not use it after that. ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > Can we also add tests for this new functionality? Yes, good point. I will add a test for the `requiredEndOffset` parameter and the `logEndOffset` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1275472446 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) { } } -private void claim(int epoch) { +private void claim(int epoch, long newLastWriteOffset) { try { if (curClaimEpoch != -1) { throw new RuntimeException("Cannot claim leadership because we are already the " + "active controller."); } curClaimEpoch = epoch; controllerMetrics.setActive(true); -updateWriteOffset(lastCommittedOffset); +updateWriteOffset(newLastWriteOffset); Review Comment: Well, there is not really any difference between the previous iterations of this PR and the current one in this regard. If some other component that isn't the controller is adding messages, our supplied `requiredBaseOffset` may be invalid. It is only a snapshot of the offset at a point in time, after all. Which is why we check `requiredBaseOffset` in `atomicAppend`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1276690892 ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -171,16 +172,21 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum * batch size; if this exception is throw none of the elements in records were * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ -long scheduleAtomicAppend(int epoch, List records); +long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); Review Comment: The controller doesn't use `scheduleAppend`. We only use `scheduleAtomicAppend`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org