[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268420993


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   > But are the any drawbacks by removing highestLogOffset <= 
leaderEpochEndOffset from the while? What kind of problem we can face and why 
having this check is more safe rather than remove it?
   
   Assume there are multiple back-to-back unclean leader elections happened and 
only **one replica** is available at any point of time. Both B1 and B2 may not 
be aware of all the leader-epochs. If the consumer reads the data from the 
beginning of the topic, then we should be able to serve the respective remote 
log segments for the (epoch, start_offset) present in both B1 and B2.
   
   This patch only mark the segment as unreferenced if the current segment is a 
superset of all the previously uploaded segments for the same epoch which means 
the message is same across the segments.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   @satishd @Nickstery Correct me If I'm wrong:
   
   The logic to find the `copiedOffset`  from remoteStorage doesn't take in 
account of the current-leader-epoch checkpoint file. In the above test, when B2 
becomes leader, it's leader-epoch-checkpoint file will look like:
   
   (The case mentioned in the test can happen when acks is set to 1)
   
   ```
   0
   2
   0 0
   1 151
   ```
   
   The logic to find the copied offset traverses the checkpoint file from 
latest-epoch. So, when B2 tries to find the copied offset:
   
   For epoch(1), there won't any uploaded segments, so it returns empty.
   For epoch(0), the highest copied offset will be 200
   
   So, B2 will skip the segment S2 (101-190) which means there is a data loss 
from [151-190]
   
   @Nickstery This can be fixed if we update the  logic to find the 
`copiedOffset`:
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-19 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1268399339


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   @satishd 
   The logic to find the `copiedOffset`  from remoteStorage doesn't take in 
account of the current-leader-epoch checkpoint file. In the above test, when B2 
becomes leader, it's leader-epoch-checkpoint file will look like:
   
   (The case mentioned in the test can happen when acks is set to 1)
   
   ```
   0
   2
   0 0
   1 151
   ```
   
   The logic to find the copied offset traverses the checkpoint file from 
latest-epoch. So, when B2 tries to find the copied offset:
   
   For epoch(1), there won't any uploaded segments, so it returns empty.
   For epoch(0), the highest copied offset will be 200
   
   So, B2 will skip the segment S2 (101-190) which means there is a data loss 
from [151-190]
   
   @Nickstery This can be fixed if we update the  logic to find the 
`copiedOffset`:
   ```
   find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, 
highest-remote-offset-for-epoch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-18 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1267015333


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   Thinking more on this, we may have to update the logic to find the 
[copiedOffset](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L499)
 such  that it's minimum of (end_offset_for_that_epoch_in_checkpoint_file, 
highest_remote_offset). In this case, when B2 was leader, copied_offset should 
be min(80, 112) where 80 is end_offset for LE0 and 112 is the 
highest_remote_offset for LE0.
   
   So, that B2 can able to upload the segment S2 (12:101)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14004: KAFKA-15168: Handle overlapping remote log segments in RemoteLogMetadata cache

2023-07-18 Thread via GitHub


kamalcph commented on code in PR #14004:
URL: https://github.com/apache/kafka/pull/14004#discussion_r1266989386


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java:
##
@@ -100,17 +100,29 @@ void 
handleSegmentWithCopySegmentStartedState(RemoteLogSegmentId remoteLogSegmen
 
 void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
Long leaderEpochEndOffset) {
+// If there are duplicate segments uploaded due to leader-election, 
then mark them as unreferenced.
+// Duplicate segments can be uploaded when the previous leader had 
tier-lags and the next leader uploads the
+// segment for the same leader-epoch which is a super-set of 
previously uploaded segments.
+// (eg)
+// case-1: Duplicate segment
+//  L0 uploaded segment S0 with offsets 0-100 and L1 uploaded 
segment S1 with offsets 0-200.
+//  We will mark the segment S0 as duplicate and add it to 
unreferencedSegmentIds.
+// case-2: Overlapping segments
+//  L0 uploaded segment S0 with offsets 10-90 and L1 uploaded 
segment S1 with offsets 5-100, S2-101-200,
+//  and so on. When the consumer request for segment with offset 
95, it should get the segment S1 and not S0.
+Map.Entry lastEntry = offsetToId.lastEntry();
+while (lastEntry != null && lastEntry.getKey() >= startOffset && 
highestLogOffset <= leaderEpochEndOffset) {

Review Comment:
   1. `highestLogOffset` is the highest end offset of all the uploaded segments 
so far, so while removing the last entry inside the while loop,  updating the 
`highestLogOffset` value is not required. A last entry is eligible to be 
removed only when the `highestLogOffset` seen so far is lesser than the current 
segment end offset. 
   2. The case that you mentioned is about unclean leader election. A passive 
segment is eligible for upload to remote storage only when the 
last-stable-offset is ahead of the passive segment end offset. When B2 was 
restored, B1 was the last standing replica and died with disk failure, so B2 
can only be elected with unclean leader election. 
   
   When B2 elected as unclean leader, it will have 2 segments [1-11] and 
[12-101], where the second segment [12 - 101] contains messages written by more 
than one leader epoch (LE0:12-80 and LE1: 81-101). B2 cannot be able to upload 
the segment S2 (12:101) as the highest offset uploaded so far in the remote 
storage for LE0 is 112. B2 can only be able to upload from segments which 
contain offset 113. 
   
   When B1 was brought back as replica, it truncates itself up-to the largest 
common log prefix offset (target-offset) including the leader-epoch-checkpoint 
file so it cannot serve the uploaded segments from remote storage. If the 
target-offset is lesser than the local-log-start-offset, then the uploaded 
remote log segments will be cleared eventually when smallest (leader_epoch, 
start_offset) in the leader-epoch-checkpoint file is greater the unreferenced 
segment epoch.
   
   `RemoteLogLeaderEpochState` is maintained for one epoch, we cannot extend it 
for multiple leader_epochs.
   
   In remote storage, the records for the offsets from [81-112] will be 
maintained for both LE0 and LE1. With 
[KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation),
 the fetcher can detect and handle the log truncation themselves.
   
   Thanks for the detailed review! Let me know if I'm missing something.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org