Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2024-01-16 Thread via GitHub


junrao commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1453768411


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+  shouldRoll = true

Review Comment:
   @kamalcph : Thanks for the explanation. I understand this change now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2024-01-12 Thread via GitHub


kamalcph commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1451300245


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+  shouldRoll = true

Review Comment:
   In the normal topic, we will consider all the segments (inc. active) for 
deletion due to breach by size/time. When the number of deletable segments 
equals to the number of segments that exists in the local disk, then we `roll` 
a new segment and delete all the eligible segments in the same iteration. 
   
   
[UnifiedLog#deleteSegments](https://sourcegraph.com/github.com/apache/kafka@21227bda61e75e3a8f1401ff94b27e9161cd3f1b/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1505-L1511)
   
   With remote storage enabled on the topic, once the active segments gets 
rolled, we have to successfully upload it to the remote storage before 
considering it as eligible for deletion so we did this in two iterations:
   
   When the `kafka-log-retention` thread runs:
   
   Iteration-1: If the active segment breached the local-retention time/size, 
then it rolls the segment to passive.
   Iteration-2: If the rolled passive segment gets uploaded to remote storage, 
then it removes that segment from local disk.
   
   The remote-retention-cleaner thread will remove the uploaded segment from 
the remote storage when it breaches the complete retention-size/ retention-time.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2024-01-12 Thread via GitHub


junrao commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1450990461


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && 
segment.size > 0) {
+  shouldRoll = true

Review Comment:
   > We have checks to allow only the passive segments to be uploaded, so the 
active segment never gets removed at all even if breaches the retention time.
   
   Hmm, the above description of the PR doesn't seem correct. Before this PR, 
we will consider the deletion of the active segment as long as it's not empty. 
If the active segment is meets the deletion criteria, we will delete it and 
automatically roll a new empty segment. So, why do we need to explicitly roll 
here? Also, why do we only do that when remote log is enabled? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd merged PR #14766:
URL: https://github.com/apache/kafka/pull/14766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1829038408

   Thanks @kamalcph for pointing out the JIRA to track the existing 
intermittent tiered storage related test failure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


kamalcph commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1828074097

   > @kamalcph The Below tests have failed, please take a look.
   > 
   > ```
   > Build / JDK 17 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=zk – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   > 1m 21s
   > Build / JDK 17 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   > 1m 9s
   > Build / JDK 17 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   > ```
   
   @satishd This seems to be a flaky test and failing even before opening this 
patch. We can take this separately: 
   
   
https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=169981379&search.startTimeMin=169860420&search.timeZoneId=Asia%2FCalcutta&tests.container=org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest&tests.test=testSendOffsetsWithGroupId(String)%5B1%5D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827617942

   @kamalcph The Below tests have failed, please take a look. 
   
   ```
   Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=zk 
– org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   1m 21s
   Build / JDK 17 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   1m 9s
   Build / JDK 17 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-27 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827613715

   Thanks @clolov for the update. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-26 Thread via GitHub


satishd commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827083641

   @clolov: The queries that you had in your earlier comments were addressed by 
@kamalcph, please take a look. I plan to merge these changes if you have no 
further comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-22 Thread via GitHub


kamalcph commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1823109337

   > The first one is that I no longer see a point in segment.bytes and 
segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect 
to tiered topics. If a person says "hey, I only want 4GB of data or data from 
the last 10 minutes around" then why would they ever need to configure how 
often a segment should be closed? If this is the case shouldn't this be 
followed by ignoring those two properties for tiered topics?
   
   `segment.bytes` will take effect when the topic has continuous inflow of 
data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day 
and the partition has a bytes-in load of 1 MB/sec, then the segment gets filled 
in ~40 mins and gets rotated to passive. 
   
   `segment.ms` will take effect when the topic has continuous inflow of data. 
If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and 
the partition has a bytes-in load of 100 bytes/sec, then the segment won't be 
filled and gets rotated to passive after 1 day.
   
   In this patch, we are trying to handle the case where the topic has some 
data in the active segment but doesn't have continuous inflow of data. So, both 
the `segment.ms` and `segment.bytes` configs are applicable for tiered storage 
topics.
   
   > The second one is that you will be changing the definition of 
local.retention. Prior to this change it meant that closed segments will be 
served from local disk for at most this much size or time as long as they have 
been moved to tiered storage. Now it will mean that anything beyond this size 
and time will be found only in tiered storage. Isn't this a "public facing 
change" and thus requiring some announcements?
   
   This was the original plan and inline with the local-log segments. If a 
topic was deprecated and won't be having any more payload in future, the user 
will expect that all the data in that topic will be removed post the retention 
time, otherwise it can fail to meet certain compliance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-21 Thread via GitHub


clolov commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1820664052

   I like this approach because it moves in the direction of us configuring how 
much data we would like to store locally based on size and time. However, in my 
head this introduces 2 problems.
   
   The first one is that I no longer see a point in segment.bytes and 
segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect 
to tiered topics. If a person says "hey, I only want 4GB of data or data from 
the last 10 minutes around" then why would they ever need to configure how 
often a segment should be closed? If this is the case shouldn't this be 
followed by ignoring those two properties for tiered topics?
   
   The second one is that you will be changing the definition of 
local.retention. Prior to this change it meant that closed segments will be 
served from local disk for at most this much size or time as long as they have 
been moved to tiered storage. Now it will mean that anything beyond this size 
and time will be found only in tiered storage. Isn't this a "public facing 
change" and thus requiring some announcements?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-20 Thread via GitHub


satishd commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1399008945


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,71 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && !isLastSegmentAndEmpty && remoteLogEnabled() && 
nextSegmentOpt.isEmpty) {

Review Comment:
   Can we have the below condition which is easy to understand or some thing 
better?
   ```
   predicateResult && remoteLogEnabled() &&
   nextSegmentOpt.isEmpty  && segment.size > 0 // active segment is not empty
   ```
   
   Another way is to keep the last segment's predicate result and its size and 
do the check after the while loop. 



##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+im

Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-16 Thread via GitHub


kamalcph commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1395386695


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test to verify that the active segment is rolled and uploaded to remote 
storage when the segment breaches the
+ * local log retention policy.
+ */
+public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness {
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+final Integer broker0 = 0;
+final String topicA = "topicA";
+final Integer p0 = 0;
+final Integer partitionCount = 1;
+final Integer replicationFactor = 1;
+final Integer maxBatchCountPerSegment = 1;
+final Map> replicaAssignment = null;
+final boolean enableRemoteLogStorage = true;
+
+// Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+enableRemoteLogStorage)
+// update the topic config such that it triggers the rolling 
of the active segment
+.updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
+// produce events to partition 0 and expect all the 4 segments 
to be offloaded
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+// consume from the beginning of the topic to read data from 
local and remote storage
+.expectFetchFromTieredStorage(broker0, topicA, p0, 4)
+.consume(topicA, p0, 0L, 4, 4);
+}
+
+private Map configsToBeAdded() {
+// Update localLog retentionMs to 1 ms and segment roll-time to 10 ms
+Map topicConfigs = new HashMap<>();
+topicConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1");

Review Comment:
   `localRetentionMs` is set to 1 ms, once it passed, the active segment will 
be rolled over and offloaded to the remote storage.
   
   Note that all the tests under tiered-storage integration test specifies the 
`localRetentionBytes` as 1 byte (See: 
[TieredStorageTestUtils](https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java?L174))
 to offload the passive segment ASAP and to keep the active segment on local 
storage to assert reading the data from both local and remote storage. We don't 
want to change the 
[deleteRetentionSizeBreachedSegments](https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1502-1513)
 behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...