Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd merged PR #14787: URL: https://github.com/apache/kafka/pull/14787 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1827078547 Test failures in jenkins jobs are unrelated to this change, merging it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1823088481 @satishd Addressed your review comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { -Optional offset = Optional.empty(); - -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { +OffsetAndEpoch offsetAndEpoch = null; +Option leaderEpochCacheOpt = log.leaderEpochCache(); +if (leaderEpochCacheOpt.isDefined()) { +LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); +Optional maybeEpochEntry = cache.latestEntry(); +while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { +int epoch = maybeEpochEntry.get().epoch; +Optional highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +if (highestRemoteOffsetOpt.isPresent()) { +Map.Entry entry = cache.endOffsetFor(epoch, log.logEndOffset()); +int requestedEpoch = entry.getKey(); +long endOffset = entry.getValue(); +long highestRemoteOffset = highestRemoteOffsetOpt.get(); +// It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache Review Comment: It may not always be the same if truncation occurs in leader-epoch-cache because of log truncation for some reason after the latestEntry is accessed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
satishd commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1400289949 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { -Optional offset = Optional.empty(); - -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { +OffsetAndEpoch offsetAndEpoch = null; +Option leaderEpochCacheOpt = log.leaderEpochCache(); +if (leaderEpochCacheOpt.isDefined()) { +LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); +Optional maybeEpochEntry = cache.latestEntry(); +while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { +int epoch = maybeEpochEntry.get().epoch; +Optional highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +if (highestRemoteOffsetOpt.isPresent()) { +Map.Entry entry = cache.endOffsetFor(epoch, log.logEndOffset()); +int requestedEpoch = entry.getKey(); +long endOffset = entry.getValue(); +long highestRemoteOffset = highestRemoteOffsetOpt.get(); +// It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache Review Comment: It may not always be the same if truncation occurs in leader-epoch-cache because of log truncation for some reason. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1422,20 +1428,39 @@ RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offse return nextBatch; } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { -Optional offset = Optional.empty(); - -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { +OffsetAndEpoch offsetAndEpoch = null; +Option leaderEpochCacheOpt = log.leaderEpochCache(); +if (leaderEpochCacheOpt.isDefined()) { +LeaderEpochFileCache cache = leaderEpochCacheOpt.get(); +Optional maybeEpochEntry = cache.latestEntry(); +while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { +int epoch = maybeEpochEntry.get().epoch; +Optional highestRemoteOffsetOpt = + remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); +if (highestRemoteOffsetOpt.isPresent()) { +Map.Entry entry = cache.endOffsetFor(epoch, log.logEndOffset()); +int requestedEpoch = entry.getKey(); +long endOffset = entry.getValue(); +long highestRemoteOffset = highestRemoteOffsetOpt.get(); +// It is implicit that the (epoch == requestedEpoch) since we are traversing the leader-epoch-cache +// in descending order. +if (endOffset <= highestRemoteOffset) { +LOGGER.warn("The end-offset for epoch {}: ({}, {}) is less than or equal to the " + Review Comment: I do not think it is a warn message here as unclean leader election can happen based on the topic configuration. We can leave it as INFO level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact In
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
clolov commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1399015272 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -735,7 +781,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception { verify(remoteStorageManager, times(1)).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); // Verify we should not have updated the highest offset because of write failure -verify(mockLog, times(0)).updateHighestOffsetInRemoteStorage(anyLong()); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(anyLong()); Review Comment: ```suggestion verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong()); ``` times(1) is implicit ## storage/src/main/java/org/apache/kafka/storage/internals/log/EpochAndOffset.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.Objects; + +public class EpochAndOffset { Review Comment: Can you not use one of `org.apache.kafka.server.common.OffsetAndEpoch` or `org.apache.kafka.raft.OffsetAndEpoch`? Or is this because of trying to minimise package dependencies? ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -775,7 +821,7 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class)); verify(remoteStorageManager, never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)); -verify(mockLog, never()).updateHighestOffsetInRemoteStorage(anyLong()); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(anyLong()); Review Comment: ```suggestion verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong()); ``` As above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
clolov commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1816112742 I will aim to provide a review today! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader // epoch cache then it starts copying the segments from the earliest epoch entry's offset. -copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log)); +copiedOffsetOption = Optional.of(findHighestRemoteOffset(topicIdPartition, log)); logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, " + "leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch); +copiedOffsetOption.ifPresent(epochAndOffset -> log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset())); Review Comment: After broker restart, if there are no more segments to upload, then the `copiedOffset` might be stale. It's good to update the `highestOffsetInRemoteStorage` in the UnifiedLog once we compute it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph opened a new pull request, #14787: URL: https://github.com/apache/kafka/pull/14787 `findHighestRemoteOffset` does not take into account the leader-epoch end offset. This can cause log divergence between the local and remote log segments when there is unclean leader election. To handle it correctly, the logic to find the highest remote offset can be updated to: ``` find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch) ``` Discussion thread: https://github.com/apache/kafka/pull/14004#discussion_r1266864272 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org