lhotari commented on code in PR #25668:
URL: https://github.com/apache/pulsar/pull/25668#discussion_r3189793659
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
Review Comment:
`prunable` is both a method name and a lambda parameter name. it would be
better to rename either one for clarity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]