Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-26 Thread via GitHub


satishd merged PR #14787:
URL: https://github.com/apache/kafka/pull/14787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-26 Thread via GitHub


satishd commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1827078547

   Test failures in jenkins jobs are unrelated to this change, merging it to 
trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-22 Thread via GitHub


kamalcph commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1823088481

   @satishd 
   
   Addressed your review comments. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-21 Thread via GitHub


satishd commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 return nextBatch;
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-Optional offset = Optional.empty();
-
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+OffsetAndEpoch offsetAndEpoch = null;
+Option leaderEpochCacheOpt = 
log.leaderEpochCache();
+if (leaderEpochCacheOpt.isDefined()) {
+LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+Optional maybeEpochEntry = cache.latestEntry();
+while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+int epoch = maybeEpochEntry.get().epoch;
+Optional highestRemoteOffsetOpt =
+
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+if (highestRemoteOffsetOpt.isPresent()) {
+Map.Entry entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+int requestedEpoch = entry.getKey();
+long endOffset = entry.getValue();
+long highestRemoteOffset = highestRemoteOffsetOpt.get();
+// It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache

Review Comment:
   It may not always be the same if truncation occurs in leader-epoch-cache 
because of log truncation for some reason after the latestEntry is accessed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-21 Thread via GitHub


satishd commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 return nextBatch;
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-Optional offset = Optional.empty();
-
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+OffsetAndEpoch offsetAndEpoch = null;
+Option leaderEpochCacheOpt = 
log.leaderEpochCache();
+if (leaderEpochCacheOpt.isDefined()) {
+LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+Optional maybeEpochEntry = cache.latestEntry();
+while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+int epoch = maybeEpochEntry.get().epoch;
+Optional highestRemoteOffsetOpt =
+
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+if (highestRemoteOffsetOpt.isPresent()) {
+Map.Entry entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+int requestedEpoch = entry.getKey();
+long endOffset = entry.getValue();
+long highestRemoteOffset = highestRemoteOffsetOpt.get();
+// It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache

Review Comment:
   It may not always be the same if truncation occurs in leader-epoch-cache 
because of log truncation for some reason. 



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
 return nextBatch;
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog 
log) throws RemoteStorageException {
-Optional offset = Optional.empty();
-
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) throws RemoteStorageException {
+OffsetAndEpoch offsetAndEpoch = null;
+Option leaderEpochCacheOpt = 
log.leaderEpochCache();
+if (leaderEpochCacheOpt.isDefined()) {
+LeaderEpochFileCache cache = leaderEpochCacheOpt.get();
+Optional maybeEpochEntry = cache.latestEntry();
+while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) {
+int epoch = maybeEpochEntry.get().epoch;
+Optional highestRemoteOffsetOpt =
+
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+if (highestRemoteOffsetOpt.isPresent()) {
+Map.Entry entry = cache.endOffsetFor(epoch, 
log.logEndOffset());
+int requestedEpoch = entry.getKey();
+long endOffset = entry.getValue();
+long highestRemoteOffset = highestRemoteOffsetOpt.get();
+// It is implicit that the (epoch == requestedEpoch) since 
we are traversing the leader-epoch-cache
+// in descending order.
+if (endOffset <= highestRemoteOffset) {
+LOGGER.warn("The end-offset for epoch {}: ({}, {}) is 
less than or equal to the " +

Review Comment:
   I do not think it is a warn message here as unclean leader election can 
happen based on the topic configuration. We can leave it as INFO level.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact In

Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-20 Thread via GitHub


clolov commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1399015272


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -735,7 +781,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws 
Exception {
 verify(remoteStorageManager, 
times(1)).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
 
 // Verify we should not have updated the highest offset because of 
write failure
-verify(mockLog, 
times(0)).updateHighestOffsetInRemoteStorage(anyLong());
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(anyLong());

Review Comment:
   ```suggestion
   verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong());
   ```
   times(1) is implicit



##
storage/src/main/java/org/apache/kafka/storage/internals/log/EpochAndOffset.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.Objects;
+
+public class EpochAndOffset {

Review Comment:
   Can you not use one of `org.apache.kafka.server.common.OffsetAndEpoch` or 
`org.apache.kafka.raft.OffsetAndEpoch`? Or is this because of trying to 
minimise package dependencies?



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -775,7 +821,7 @@ void 
testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
 verify(remoteLogMetadataManager, 
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
 verify(remoteStorageManager, 
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), 
any(LogSegmentData.class));
 verify(remoteLogMetadataManager, 
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
-verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong());
+verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(anyLong());

Review Comment:
   ```suggestion
   verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong());
   ```
   As above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


clolov commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1816112742

   I will aim to provide a review today!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


kamalcph commented on code in PR #14787:
URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) 
throws RemoteStorageExcepti
 // of a segment with that epoch copied into remote storage. If 
it can not find an entry then it checks for the
 // previous leader epoch till it finds an entry, If there are 
no entries till the earliest leader epoch in leader
 // epoch cache then it starts copying the segments from the 
earliest epoch entry's offset.
-copiedOffsetOption = 
OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
+copiedOffsetOption = 
Optional.of(findHighestRemoteOffset(topicIdPartition, log));
 logger.info("Found the highest copiedRemoteOffset: {} for 
partition: {} after becoming leader, " +
 "leaderEpoch: {}", copiedOffsetOption, 
topicIdPartition, leaderEpoch);
+copiedOffsetOption.ifPresent(epochAndOffset ->  
log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset()));

Review Comment:
   After broker restart, if there are no more segments to upload, then the 
`copiedOffset` might be stale. It's good to update the 
`highestOffsetInRemoteStorage` in the UnifiedLog once we compute it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-17 Thread via GitHub


kamalcph opened a new pull request, #14787:
URL: https://github.com/apache/kafka/pull/14787

   `findHighestRemoteOffset` does not take into account the leader-epoch end 
offset. This can cause log divergence between the local and remote log segments 
when there is unclean leader election.
   
   To handle it correctly, the logic to find the highest remote offset can be 
updated to:
   
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```
   
   Discussion thread: 
https://github.com/apache/kafka/pull/14004#discussion_r1266864272
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org