Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-08 Thread via GitHub


showuon merged PR #15817:
URL: https://github.com/apache/kafka/pull/15817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-08 Thread via GitHub


showuon commented on PR #15817:
URL: https://github.com/apache/kafka/pull/15817#issuecomment-2099912904

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-06 Thread via GitHub


satishd commented on PR #15817:
URL: https://github.com/apache/kafka/pull/15817#issuecomment-2095255793

   @abhijeetk88 for reviewing the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-04 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1589925627


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1245,19 +1260,27 @@ public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
 return false;
 }
 
-// Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
-if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
-LOGGER.debug("Segment {} first epoch {} offset is less than 
leader epoch offset {}.",
-segmentMetadata.remoteLogSegmentId(), epoch, 
leaderEpochs.get(epoch));
+// Two cases:
+// case-1: When the segment-first-epoch equals to the first-epoch 
in the leader-epoch-lineage, then the
+// offset value can lie anywhere between 0 to 
(next-epoch-start-offset - 1) is valid.
+// case-2: When the segment-first-epoch is not equal to the 
first-epoch in the leader-epoch-lineage, then
+// the offset value should be between (current-epoch-start-offset) 
to (next-epoch-start-offset - 1).
+if (epoch == segmentFirstEpoch && leaderEpochs.lowerKey(epoch) != 
null && offset < leaderEpochs.get(epoch)) {
+LOGGER.debug("Segment {} first-valid epoch {} offset is less 
than leader epoch offset {}." +

Review Comment:
   nit: is less than "first" leader epoch offset...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1589055167


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   Given that we roll the log based on `log.roll.ms` and the server startup 
time can vary between the replicas, this is quite common.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588971961


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   OK, the Broker1 having segment size different from broker0 could be because 
the Broker1 has the larger `log.segment.bytes` than Broker0. That makes sense. 
It's quite rare to see, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588785888


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {

Review Comment:
   I'll add integration tests with LocalTieredStorage to cover various 
scenarios. The test will be useful to catch any regression if we refactor the 
code later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588784969


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   yes, you're right. The `findHighestRemoteOffset` will return the 
`copiedOffset` for the new leader. 
   
   The case mentioned can happen as the logic to rotate the segments is 
independent of brokers in the cluster. 
   
   Broker-0 can have local-log segments like:
  segment0 = 0-50
  segment1 = 51-100
  active-segment = 101-120

and it uploaded both segment0 and segment1 to remote storage.

Broker1 can have local-log segments like:
  segment0 = 0 - 80
  active-segment = 81-120
  
   When Broker1 becomes leader, it finds the highest remote-offset as 100, then 
it has to upload the current active segment segment once it gets rotated. 
Thereby, we will have overlapping remote log segments.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588785888


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {

Review Comment:
   I'll add integration tests with LocalTieredStorage to cover various 
scenarios. The test will be useful to catch any regression if we refactor the 
code later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588784969


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.

Review Comment:
   yes, you're right. The `findHighestRemoteOffset` will return the 
`copiedOffset` for the new leader. 
   
   The case mentioned can happen as the logic to rotate the segments is 
independent of brokers in the cluster. 
   
   Broker-0 can have local-log segments like:
  segment0 = 0-50
  segment1 = 51-100
  active-segment = 101-120

and it uploaded both segment0 and segment1 to remote storage.

Broker1 can have local-log segments like:
  segment0 = 0 - 80
  active-segment = 81-120
  
   When Broker1 becomes leader, it findHighestRemoteOffset as 100, then it has 
to upload the current active segment segment once it gets rotated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-03 Thread via GitHub


kamalcph commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1588781775


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {

Review Comment:
   For this case, the segment will be deleted by the 
`isSegmentBreachByLogStartOffset` check and it won't enter 
`isRemoteSegmentWithinLeaderEpochs` check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-05-01 Thread via GitHub


showuon commented on code in PR #15817:
URL: https://github.com/apache/kafka/pull/15817#discussion_r1587041449


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {

Review Comment:
   I'm not sure if the check `segmentLastEpoch < leaderEpochs.firstKey()` makes 
sense or not.
   
   Suppose: 
   leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
   segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 15)}
   Now, delete_records are called and log start offset incremented to 100, so 
the new leader-epoch-file-cache will be: {(9, 100)}
   
   When entering this check, it'll fail because the segmentLastEpoch (7) will 
be < leaderEpochs.firstKey() (9). But we still want to delete this segment, 
right?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1217,26 +1217,41 @@ public String toString() {
  * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
  */
 // Visible for testing
-public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
-long logEndOffset,
-
NavigableMap leaderEpochs) {
+static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata 
segmentMetadata,
+ long logEndOffset,
+ NavigableMap leaderEpochs) {
 long segmentEndOffset = segmentMetadata.endOffset();
 // Filter epochs that does not have any messages/records associated 
with them.
 NavigableMap segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
 // Check for out of bound epochs between segment epochs and current 
leader epochs.
-Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
 Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
-if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+if (segmentLastEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+LOGGER.debug("Segment {} is not within the partition leader epoch 
lineage. " +
+"Remote segment epochs: {} and partition leader 
epochs: {}",
+segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, 
leaderEpochs);
+return false;
+}
+// There can be overlapping remote log segments in the remote storage. 
(eg)
+// leader-epoch-file-cache: {(5, 10), (7, 15), (9, 100)}
+// segment1: offset-range = 5-50, Broker = 0, epochs = {(5, 10), (7, 
15)}
+// segment2: offset-range = 14-150, Broker = 1, epochs = {(5, 14), (7, 
15), (9, 100)}, after leader-election.
+// When the segment1 gets deleted, then the log-start-offset = 51 and 
leader-epoch-file-cache gets updated to: {(7, 51), (9, 100)}.
+// While validating the segment2, we should ensure the overlapping 
remote log segments case.
+Integer segmentFirstEpoch = 
segmentLeaderEpochs.ceilingKey(leaderEpochs.firstKey());
+if (segmentFirstEpoch == null || 
!leaderEpochs.containsKey(segmentFirstEpoch)) {

Review Comment:
   Same here, if the above case makes sense, this check also fails to delete 
the segment.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:

Re: [PR] KAFKA-16511: Fix the leaking tiered segments during segment deletion [kafka]

2024-04-27 Thread via GitHub


kamalcph commented on PR #15817:
URL: https://github.com/apache/kafka/pull/15817#issuecomment-2081328537

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org