Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
junrao commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1453768411 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) - - // Check not to delete segments which are not yet copied to tiered storage if remote log is enabled. - (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage)) && -// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start -// offset can never exceed it. -highWatermark >= upperBoundOffset && -predicate(segment, nextSegmentOpt) -} lock synchronized { - val deletable = localLog.deletableSegments(shouldDelete) + val deletable = deletableSegments(predicate) if (deletable.nonEmpty) deleteSegments(deletable, reason) else 0 } } + /** + * Find segments starting from the oldest until the user-supplied predicate is false. + * A final segment that is empty will never be returned. + * + * @param predicate A function that takes in a candidate log segment, the next higher segment + * (if there is one). It returns true iff the segment is deletable. + * @return the segments ready to be deleted + */ + private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { +def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + // Segments are eligible for deletion when: + //1. they are uploaded to the remote storage + if (remoteLogEnabled()) { +upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + } else { +true + } +} + +if (localLog.segments.isEmpty) { + Seq.empty +} else { + val deletable = ArrayBuffer.empty[LogSegment] + val segmentsIterator = localLog.segments.values.iterator + var segmentOpt = nextOption(segmentsIterator) + var shouldRoll = false + while (segmentOpt.isDefined) { +val segment = segmentOpt.get +val nextSegmentOpt = nextOption(segmentsIterator) +val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0 +val upperBoundOffset = if (nextSegmentOpt.nonEmpty) nextSegmentOpt.get.baseOffset() else logEndOffset +// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start +// offset can never exceed it. +val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) + +// Roll the active segment when it breaches the configured retention policy. The rolled segment will be +// eligible for deletion and gets removed in the next iteration. +if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && segment.size > 0) { + shouldRoll = true Review Comment: @kamalcph : Thanks for the explanation. I understand this change now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
kamalcph commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1451300245 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) - - // Check not to delete segments which are not yet copied to tiered storage if remote log is enabled. - (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage)) && -// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start -// offset can never exceed it. -highWatermark >= upperBoundOffset && -predicate(segment, nextSegmentOpt) -} lock synchronized { - val deletable = localLog.deletableSegments(shouldDelete) + val deletable = deletableSegments(predicate) if (deletable.nonEmpty) deleteSegments(deletable, reason) else 0 } } + /** + * Find segments starting from the oldest until the user-supplied predicate is false. + * A final segment that is empty will never be returned. + * + * @param predicate A function that takes in a candidate log segment, the next higher segment + * (if there is one). It returns true iff the segment is deletable. + * @return the segments ready to be deleted + */ + private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { +def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + // Segments are eligible for deletion when: + //1. they are uploaded to the remote storage + if (remoteLogEnabled()) { +upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + } else { +true + } +} + +if (localLog.segments.isEmpty) { + Seq.empty +} else { + val deletable = ArrayBuffer.empty[LogSegment] + val segmentsIterator = localLog.segments.values.iterator + var segmentOpt = nextOption(segmentsIterator) + var shouldRoll = false + while (segmentOpt.isDefined) { +val segment = segmentOpt.get +val nextSegmentOpt = nextOption(segmentsIterator) +val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0 +val upperBoundOffset = if (nextSegmentOpt.nonEmpty) nextSegmentOpt.get.baseOffset() else logEndOffset +// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start +// offset can never exceed it. +val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) + +// Roll the active segment when it breaches the configured retention policy. The rolled segment will be +// eligible for deletion and gets removed in the next iteration. +if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && segment.size > 0) { + shouldRoll = true Review Comment: In the normal topic, we will consider all the segments (inc. active) for deletion due to breach by size/time. When the number of deletable segments equals to the number of segments that exists in the local disk, then we `roll` a new segment and delete all the eligible segments in the same iteration. [UnifiedLog#deleteSegments](https://sourcegraph.com/github.com/apache/kafka@21227bda61e75e3a8f1401ff94b27e9161cd3f1b/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1505-L1511) With remote storage enabled on the topic, once the active segments gets rolled, we have to successfully upload it to the remote storage before considering it as eligible for deletion so we did this in two iterations: When the `kafka-log-retention` thread runs: Iteration-1: If the active segment breached the local-retention time/size, then it rolls the segment to passive. Iteration-2: If the rolled passive segment gets uploaded to remote storage, then it removes that segment from local disk. The remote-retention-cleaner thread will remove the uploaded segment from the remote storage when it breaches the complete retention-size/ retention-time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
junrao commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1450990461 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,25 +1425,70 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) - - // Check not to delete segments which are not yet copied to tiered storage if remote log is enabled. - (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage)) && -// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start -// offset can never exceed it. -highWatermark >= upperBoundOffset && -predicate(segment, nextSegmentOpt) -} lock synchronized { - val deletable = localLog.deletableSegments(shouldDelete) + val deletable = deletableSegments(predicate) if (deletable.nonEmpty) deleteSegments(deletable, reason) else 0 } } + /** + * Find segments starting from the oldest until the user-supplied predicate is false. + * A final segment that is empty will never be returned. + * + * @param predicate A function that takes in a candidate log segment, the next higher segment + * (if there is one). It returns true iff the segment is deletable. + * @return the segments ready to be deleted + */ + private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { +def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + // Segments are eligible for deletion when: + //1. they are uploaded to the remote storage + if (remoteLogEnabled()) { +upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + } else { +true + } +} + +if (localLog.segments.isEmpty) { + Seq.empty +} else { + val deletable = ArrayBuffer.empty[LogSegment] + val segmentsIterator = localLog.segments.values.iterator + var segmentOpt = nextOption(segmentsIterator) + var shouldRoll = false + while (segmentOpt.isDefined) { +val segment = segmentOpt.get +val nextSegmentOpt = nextOption(segmentsIterator) +val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0 +val upperBoundOffset = if (nextSegmentOpt.nonEmpty) nextSegmentOpt.get.baseOffset() else logEndOffset +// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start +// offset can never exceed it. +val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) + +// Roll the active segment when it breaches the configured retention policy. The rolled segment will be +// eligible for deletion and gets removed in the next iteration. +if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && segment.size > 0) { + shouldRoll = true Review Comment: > We have checks to allow only the passive segments to be uploaded, so the active segment never gets removed at all even if breaches the retention time. Hmm, the above description of the PR doesn't seem correct. Before this PR, we will consider the deletion of the active segment as long as it's not empty. If the active segment is meets the deletion criteria, we will delete it and automatically roll a new empty segment. So, why do we need to explicitly roll here? Also, why do we only do that when remote log is enabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd merged PR #14766: URL: https://github.com/apache/kafka/pull/14766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1829038408 Thanks @kamalcph for pointing out the JIRA to track the existing intermittent tiered storage related test failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
kamalcph commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1828074097 > @kamalcph The Below tests have failed, please take a look. > > ``` > Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=zk – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest > 1m 21s > Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest > 1m 9s > Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest > ``` @satishd This seems to be a flaky test and failing even before opening this patch. We can take this separately: https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=169981379&search.startTimeMin=169860420&search.timeZoneId=Asia%2FCalcutta&tests.container=org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest&tests.test=testSendOffsetsWithGroupId(String)%5B1%5D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827617942 @kamalcph The Below tests have failed, please take a look. ``` Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=zk – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest 1m 21s Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest 1m 9s Build / JDK 17 and Scala 2.13 / testSendOffsetsWithGroupId(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827613715 Thanks @clolov for the update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1827083641 @clolov: The queries that you had in your earlier comments were addressed by @kamalcph, please take a look. I plan to merge these changes if you have no further comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
kamalcph commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1823109337 > The first one is that I no longer see a point in segment.bytes and segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect to tiered topics. If a person says "hey, I only want 4GB of data or data from the last 10 minutes around" then why would they ever need to configure how often a segment should be closed? If this is the case shouldn't this be followed by ignoring those two properties for tiered topics? `segment.bytes` will take effect when the topic has continuous inflow of data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and the partition has a bytes-in load of 1 MB/sec, then the segment gets filled in ~40 mins and gets rotated to passive. `segment.ms` will take effect when the topic has continuous inflow of data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and the partition has a bytes-in load of 100 bytes/sec, then the segment won't be filled and gets rotated to passive after 1 day. In this patch, we are trying to handle the case where the topic has some data in the active segment but doesn't have continuous inflow of data. So, both the `segment.ms` and `segment.bytes` configs are applicable for tiered storage topics. > The second one is that you will be changing the definition of local.retention. Prior to this change it meant that closed segments will be served from local disk for at most this much size or time as long as they have been moved to tiered storage. Now it will mean that anything beyond this size and time will be found only in tiered storage. Isn't this a "public facing change" and thus requiring some announcements? This was the original plan and inline with the local-log segments. If a topic was deprecated and won't be having any more payload in future, the user will expect that all the data in that topic will be removed post the retention time, otherwise it can fail to meet certain compliance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
clolov commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1820664052 I like this approach because it moves in the direction of us configuring how much data we would like to store locally based on size and time. However, in my head this introduces 2 problems. The first one is that I no longer see a point in segment.bytes and segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect to tiered topics. If a person says "hey, I only want 4GB of data or data from the last 10 minutes around" then why would they ever need to configure how often a segment should be closed? If this is the case shouldn't this be followed by ignoring those two properties for tiered topics? The second one is that you will be changing the definition of local.retention. Prior to this change it meant that closed segments will be served from local disk for at most this much size or time as long as they have been moved to tiered storage. Now it will mean that anything beyond this size and time will be found only in tiered storage. Isn't this a "public facing change" and thus requiring some announcements? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1399008945 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,25 +1425,71 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: SegmentDeletionReason): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) - - // Check not to delete segments which are not yet copied to tiered storage if remote log is enabled. - (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage)) && -// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start -// offset can never exceed it. -highWatermark >= upperBoundOffset && -predicate(segment, nextSegmentOpt) -} lock synchronized { - val deletable = localLog.deletableSegments(shouldDelete) + val deletable = deletableSegments(predicate) if (deletable.nonEmpty) deleteSegments(deletable, reason) else 0 } } + /** + * Find segments starting from the oldest until the user-supplied predicate is false. + * A final segment that is empty will never be returned. + * + * @param predicate A function that takes in a candidate log segment, the next higher segment + * (if there is one). It returns true iff the segment is deletable. + * @return the segments ready to be deleted + */ + private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { +def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = { + // Segments are eligible for deletion when: + //1. they are uploaded to the remote storage + if (remoteLogEnabled()) { +upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage + } else { +true + } +} + +if (localLog.segments.isEmpty) { + Seq.empty +} else { + val deletable = ArrayBuffer.empty[LogSegment] + val segmentsIterator = localLog.segments.values.iterator + var segmentOpt = nextOption(segmentsIterator) + var shouldRoll = false + while (segmentOpt.isDefined) { +val segment = segmentOpt.get +val nextSegmentOpt = nextOption(segmentsIterator) +val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0 +val upperBoundOffset = if (nextSegmentOpt.nonEmpty) nextSegmentOpt.get.baseOffset() else logEndOffset +// We don't delete segments with offsets at or beyond the high watermark to ensure that the log start +// offset can never exceed it. +val predicateResult = highWatermark >= upperBoundOffset && predicate(segment, nextSegmentOpt) + +// Roll the active segment when it breaches the configured retention policy. The rolled segment will be +// eligible for deletion and gets removed in the next iteration. +if (predicateResult && !isLastSegmentAndEmpty && remoteLogEnabled() && nextSegmentOpt.isEmpty) { Review Comment: Can we have the below condition which is easy to understand or some thing better? ``` predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty && segment.size > 0 // active segment is not empty ``` Another way is to keep the last segment's predicate result and its size and do the check after the while loop. ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +im
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
kamalcph commented on code in PR #14766: URL: https://github.com/apache/kafka/pull/14766#discussion_r1395386695 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java: ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test to verify that the active segment is rolled and uploaded to remote storage when the segment breaches the + * local log retention policy. + */ +public class RollAndOffloadActiveSegmentTest extends TieredStorageTestHarness { + +@Override +public int brokerCount() { +return 1; +} + +@Override +protected void writeTestSpecifications(TieredStorageTestBuilder builder) { +final Integer broker0 = 0; +final String topicA = "topicA"; +final Integer p0 = 0; +final Integer partitionCount = 1; +final Integer replicationFactor = 1; +final Integer maxBatchCountPerSegment = 1; +final Map> replicaAssignment = null; +final boolean enableRemoteLogStorage = true; + +// Create topicA with 1 partition, 1 RF and enabled with remote storage. +builder.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment, +enableRemoteLogStorage) +// update the topic config such that it triggers the rolling of the active segment +.updateTopicConfig(topicA, configsToBeAdded(), Collections.emptyList()) +// produce events to partition 0 and expect all the 4 segments to be offloaded +.expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2")) +.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3")) +.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L) +.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), +new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3")) +// consume from the beginning of the topic to read data from local and remote storage +.expectFetchFromTieredStorage(broker0, topicA, p0, 4) +.consume(topicA, p0, 0L, 4, 4); +} + +private Map configsToBeAdded() { +// Update localLog retentionMs to 1 ms and segment roll-time to 10 ms +Map topicConfigs = new HashMap<>(); +topicConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "1"); Review Comment: `localRetentionMs` is set to 1 ms, once it passed, the active segment will be rolled over and offloaded to the remote storage. Note that all the tests under tiered-storage integration test specifies the `localRetentionBytes` as 1 byte (See: [TieredStorageTestUtils](https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java?L174)) to offload the passive segment ASAP and to keep the active segment on local storage to assert reading the data from both local and remote storage. We don't want to change the [deleteRetentionSizeBreachedSegments](https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L1502-1513) behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...