This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1a73470d1e [feat] PIP-468: sealed-segment retention GC for scalable 
topics (#25668)
c1a73470d1e is described below

commit c1a73470d1e4e30dfb30aebd785d6e57ce6482df
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 15:50:44 2026 -0700

    [feat] PIP-468: sealed-segment retention GC for scalable topics (#25668)
---
 .../broker/service/persistent/PersistentTopic.java |   7 +
 .../service/scalable/ScalableTopicController.java  | 349 ++++++++++++++++++++-
 .../scalable/ScalableTopicControllerTest.java      | 142 +++++++++
 3 files changed, 496 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f98da6a5a89..bf795e85c3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3550,6 +3550,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public void checkGC() {
+        if (TopicName.get(topic).isSegment()) {
+            // Segment-backing topics are owned by ScalableTopicController; 
its GC tick
+            // decides when a sealed segment is drained + retention-expired 
and deletes
+            // the topic. Letting v4 inactive-topic-GC race against that would 
risk the
+            // broker quietly tearing down a segment the controller still 
considers live.
+            return;
+        }
         if (!isDeleteWhileInactive()) {
             // This topic is not included in GC
             return;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 4a7b15a1e15..552ca321d84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -19,20 +19,30 @@
 package org.apache.pulsar.broker.service.scalable;
 
 import io.github.merlimat.slog.Logger;
+import java.time.Clock;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.scalable.HashRange;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
@@ -53,17 +63,27 @@ public class ScalableTopicController {
     private static final Logger LOG = 
Logger.get(ScalableTopicController.class);
     private final Logger log;
 
+    /** Default cadence for the sealed-segment GC tick on the leader. */
+    static final Duration DEFAULT_GC_INTERVAL = Duration.ofSeconds(60);
+
     @Getter
     private final TopicName topicName;
     private final ScalableTopicResources resources;
     private final BrokerService brokerService;
     private final LeaderElection<String> leaderElection;
+    /** Wall-clock source used for sealed-segment retention math. Tests 
override. */
+    private final Clock clock;
+    /** Cadence of the GC tick. Tests override to a small value. */
+    private final Duration gcInterval;
 
     private volatile SegmentLayout currentLayout;
 
     /** Per-subscription consumer tracking. */
     private final ConcurrentHashMap<String, SubscriptionCoordinator> 
subscriptions = new ConcurrentHashMap<>();
 
+    /** Sealed-segment GC scheduled task. Non-null only while this broker is 
leader. */
+    private volatile ScheduledFuture<?> gcTask;
+
     @Getter
     private volatile LeaderElectionState leaderState = 
LeaderElectionState.NoLeader;
 
@@ -73,9 +93,24 @@ public class ScalableTopicController {
                             ScalableTopicResources resources,
                             BrokerService brokerService,
                             CoordinationService coordinationService) {
+        this(topicName, resources, brokerService, coordinationService,
+                Clock.systemUTC(), DEFAULT_GC_INTERVAL);
+    }
+
+    /**
+     * Test constructor: overrides the wall-clock source and the GC tick 
cadence.
+     */
+    ScalableTopicController(TopicName topicName,
+                            ScalableTopicResources resources,
+                            BrokerService brokerService,
+                            CoordinationService coordinationService,
+                            Clock clock,
+                            Duration gcInterval) {
         this.topicName = topicName;
         this.resources = resources;
         this.brokerService = brokerService;
+        this.clock = clock;
+        this.gcInterval = gcInterval;
         this.log = LOG.with().attr("topic", topicName).build();
         this.leaderElection = coordinationService.getLeaderElection(
                 String.class,
@@ -90,6 +125,12 @@ public class ScalableTopicController {
      */
     private void onLeaderStateChange(LeaderElectionState state) {
         log.info().attr("state", state).log("Leader state change for scalable 
topic");
+        if (state != LeaderElectionState.Leading) {
+            // Stepped down (or never was leader). Stop the GC tick so the 
deposed leader
+            // doesn't race the new one on layout writes / backing-topic 
deletes. The new
+            // leader's initialize() will reschedule.
+            cancelGcTask();
+        }
         if (state == LeaderElectionState.NoLeader && !closed) {
             initialize().exceptionally(ex -> {
                 log.warn().exceptionMessage(ex).log("Failed to re-elect after 
NoLeader");
@@ -119,12 +160,52 @@ public class ScalableTopicController {
                 })
                 .thenCompose(__ -> {
                     if (isLeader()) {
+                        scheduleGcTask();
                         return restoreSessionsFromStore();
                     }
                     return CompletableFuture.completedFuture(null);
                 });
     }
 
+    /**
+     * Schedule the periodic sealed-segment GC tick. Only fires on the 
controller leader;
+     * idempotent (re-entry just no-ops). Cancelled on close / leader-loss.
+     */
+    private synchronized void scheduleGcTask() {
+        if (closed || gcTask != null) {
+            return;
+        }
+        gcTask = scheduler().scheduleAtFixedRate(this::runGcTickSafely,
+                gcInterval.toMillis(), gcInterval.toMillis(), 
TimeUnit.MILLISECONDS);
+    }
+
+    private synchronized void cancelGcTask() {
+        if (gcTask != null) {
+            gcTask.cancel(false);
+            gcTask = null;
+        }
+    }
+
+    private ScheduledExecutorService scheduler() {
+        return brokerService.getPulsar().getExecutor();
+    }
+
+    private void runGcTickSafely() {
+        if (!isLeader() || closed) {
+            return;
+        }
+        try {
+            runGcTickAsync().exceptionally(ex -> {
+                log.warn().exceptionMessage(ex).log("Scalable-topic GC tick 
failed");
+                return null;
+            });
+        } catch (Throwable t) {
+            // Defensive: scheduleAtFixedRate would suppress the next firing 
if a tick
+            // throws synchronously, so log and swallow here.
+            log.warn().exception(t).log("Scalable-topic GC tick threw");
+        }
+    }
+
     /**
      * Load persisted subscriptions and consumer registrations from the 
metadata store and
      * install them into per-subscription {@link SubscriptionCoordinator} 
instances. Called
@@ -288,7 +369,7 @@ public class ScalableTopicController {
         // Single timestamp shared by the local preview and the CAS-retried 
metadata update,
         // so the children's createdAtMs and the parent's sealedAtMs always 
agree even if the
         // CAS retries due to concurrent writers.
-        final long nowMs = System.currentTimeMillis();
+        final long nowMs = clock.millis();
 
         // Compute the new layout locally to derive child segment info
         SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs);
@@ -337,7 +418,7 @@ public class ScalableTopicController {
 
         // Single timestamp shared by the local preview and the CAS-retried 
metadata
         // update — see splitSegment for the rationale.
-        final long nowMs = System.currentTimeMillis();
+        final long nowMs = clock.millis();
 
         // Compute the new layout locally to derive merged segment info
         SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1, 
segmentId2, nowMs);
@@ -619,10 +700,274 @@ public class ScalableTopicController {
                 });
     }
 
+    // --- Sealed-segment GC ---
+
+    /**
+     * One iteration of the sealed-segment GC. For every sealed segment in the 
current
+     * layout whose retention window has expired, polls every known 
subscription's
+     * backlog on that segment; if all subscriptions are drained, prunes the 
segment
+     * from the DAG (CAS) and deletes its backing managed-ledger topic.
+     *
+     * <p>The retention window is resolved from topic-policies on the parent
+     * {@code topic://...} → namespace policy → broker default, the same 
precedence
+     * Pulsar uses for regular topics.
+     *
+     * <p>Visible for tests; in production it's invoked by the scheduled task.
+     */
+    CompletableFuture<Void> runGcTickAsync() {
+        if (!isLeader() || closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        final SegmentLayout layout = currentLayout;
+        if (layout == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Candidates: sealed segments past their retention horizon. We resolve
+        // retention once per tick — cheap, and avoids per-segment policy 
lookups.
+        return resolveRetentionMillisAsync()
+                .thenCompose(retentionMs -> {
+                    if (retentionMs == null) {
+                        // Negative / unset → retain forever. No GC this tick.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    long now = clock.millis();
+                    List<SegmentInfo> candidates = new ArrayList<>();
+                    for (SegmentInfo seg : layout.getAllSegments().values()) {
+                        if (seg.isSealed() && seg.sealedAtMs() > 0
+                                && (now - seg.sealedAtMs()) >= retentionMs) {
+                            candidates.add(seg);
+                        }
+                    }
+                    if (candidates.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return pruneEligibleAsync(candidates);
+                });
+    }
+
+    /**
+     * For each candidate sealed segment, check that every existing 
subscription has
+     * drained it (backlog == 0); prune the ones that pass. The drain checks 
fan out
+     * concurrently, but the resulting layout mutation is coalesced into a 
<em>single</em>
+     * CAS write so multiple eligible segments don't compete on the same 
metadata znode.
+     *
+     * <p><b>Subscription-type behaviour.</b> The drain check is the 
per-segment backlog
+     * admin endpoint — Pulsar's standard cursor-position view, which works 
the same way
+     * for STREAM (Exclusive) and QUEUE (Shared) subscriptions: a sealed 
segment with
+     * cursor at the end reports backlog 0. For CHECKPOINT subscriptions there 
is no
+     * broker-side cursor, the endpoint returns {@code NotFoundException}, and
+     * {@code isSegmentDrained} reports {@code false} — the segment is treated 
as
+     * "still in use" and never pruned while a CHECKPOINT subscription is 
registered.
+     *
+     * <p><b>Parent-vs-child ordering.</b> Sealed segments form a DAG; pruning 
is allowed
+     * in any order because the active leaves always cover the full hash 
range, and the
+     * managed-ledger storage of each segment is independent. {@link 
SegmentLayout#pruneSegment}
+     * rewrites the parent/child edges, so consumers using the post-prune 
layout see the
+     * pruned segment as "no longer present" — equivalent to "drained" for 
parent-drain
+     * ordering.
+     */
+    private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo> 
candidates) {
+        return resources.listSubscriptionsAsync(topicName)
+                .thenCompose(subs -> {
+                    // Fan out drain checks; collect the survivors.
+                    List<CompletableFuture<SegmentInfo>> filtered = new 
ArrayList<>();
+                    for (SegmentInfo seg : candidates) {
+                        filtered.add(prunable(seg, subs)
+                                .thenApply(ok -> ok ? seg : null)
+                                .exceptionally(ex -> {
+                                    log.warn().attr("segmentId", 
seg.segmentId())
+                                            .exceptionMessage(ex)
+                                            .log("GC: failed to evaluate 
prunability;"
+                                                    + " will retry on next 
tick");
+                                    return null;
+                                }));
+                    }
+                    return 
CompletableFuture.allOf(filtered.toArray(CompletableFuture[]::new))
+                            .thenApply(__ -> {
+                                List<SegmentInfo> drained = new ArrayList<>();
+                                for (var f : filtered) {
+                                    SegmentInfo s = f.join();
+                                    if (s != null) {
+                                        drained.add(s);
+                                    }
+                                }
+                                return drained;
+                            });
+                })
+                .thenCompose(this::pruneAllAsync);
+    }
+
+    /**
+     * Coalesce all drained-and-eligible segments into a single 
layout-mutation CAS,
+     * then fan out the per-segment backing-topic deletes. This is the path 
that
+     * actually mutates state. Re-validates leadership before the CAS — drain 
checks
+     * can take seconds, leadership may have flipped in the meantime, and we 
don't
+     * want a deposed leader writing layout updates.
+     */
+    private CompletableFuture<Void> pruneAllAsync(List<SegmentInfo> drained) {
+        if (drained.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        if (!isLeader() || closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        for (SegmentInfo s : drained) {
+            log.info().attr("segmentId", s.segmentId())
+                    .attr("sealedAtMs", s.sealedAtMs())
+                    .log("GC: pruning sealed segment past retention");
+        }
+        return resources.updateScalableTopicAsync(topicName, md -> {
+            SegmentLayout latest = SegmentLayout.fromMetadata(md);
+            SegmentLayout updated = latest;
+            for (SegmentInfo s : drained) {
+                // Re-validate per segment: another writer (or a previous 
failed
+                // tick of this same loop) may have already pruned it.
+                if (updated.getAllSegments().containsKey(s.segmentId())) {
+                    updated = updated.pruneSegment(s.segmentId());
+                }
+            }
+            return updated == latest ? md : 
updated.toMetadata(md.getProperties());
+        }).thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
+          .thenCompose(optMd -> {
+              currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+              return notifySubscriptions(currentLayout);
+          })
+          .thenCompose(__ -> {
+              CompletableFuture<?>[] deletes = drained.stream()
+                      .map(this::deleteSegmentBackingTopic)
+                      .toArray(CompletableFuture[]::new);
+              return CompletableFuture.allOf(deletes);
+          })
+          .thenAccept(__ -> {
+              for (SegmentInfo s : drained) {
+                  log.info().attr("segmentId", s.segmentId())
+                          .log("GC: segment pruned + backing topic deleted");
+              }
+          });
+    }
+
+    private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String> 
subs) {
+        if (subs.isEmpty()) {
+            // No subscribers ever attached / all unsubscribed → nothing left 
to drain.
+            return CompletableFuture.completedFuture(true);
+        }
+        CompletableFuture<Boolean>[] checks = subs.stream()
+                .map(sub -> isSegmentDrained(seg, sub))
+                .toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(checks)
+                .thenApply(__ -> {
+                    for (CompletableFuture<Boolean> c : checks) {
+                        if (!c.join()) {
+                            return false;
+                        }
+                    }
+                    return true;
+                });
+    }
+
+    /**
+     * Delete the segment's backing storage via the {@code scalableTopics} 
admin
+     * endpoint, which understands the {@code segment://} naming scheme and 
routes
+     * to the segment's owning broker. Failures are best-effort: the controller
+     * has already pruned the segment from the layout (the point of no return),
+     * so a failed delete is just leaked storage that the next tick will retry.
+     */
+    private CompletableFuture<Void> deleteSegmentBackingTopic(SegmentInfo seg) 
{
+        String name = toSegmentPersistentName(seg);
+        try {
+            return brokerService.getPulsar().getAdminClient()
+                    .scalableTopics().deleteSegmentAsync(name, /* force */ 
true)
+                    .exceptionally(ex -> {
+                        Throwable cause =
+                                
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException
+                                .NotFoundException) {
+                            // Already gone — fine.
+                            return null;
+                        }
+                        log.warn().attr("segment", 
name).exceptionMessage(cause)
+                                .log("GC: failed to delete backing segment 
topic;"
+                                        + " will retry on next tick");
+                        return null;
+                    });
+        } catch (PulsarServerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    /**
+     * Resolve the effective retention-time-in-millis for this scalable topic 
by
+     * layering: topic-policy on the parent {@code topic://...} → namespace 
policy →
+     * broker config default. Returns {@code null} if retention is unset or 
negative
+     * (= keep forever) — the GC tick treats that as "skip".
+     */
+    private CompletableFuture<Long> resolveRetentionMillisAsync() {
+        TopicPoliciesService topicPoliciesService =
+                brokerService.getPulsar().getTopicPoliciesService();
+        // Topic-level (override) layer.
+        return topicPoliciesService.getTopicPoliciesAsync(topicName,
+                        TopicPoliciesService.GetType.LOCAL_ONLY)
+                .thenCompose(localOpt -> {
+                    Optional<RetentionPolicies> rp = localOpt
+                            .map(TopicPolicies::getRetentionPolicies)
+                            .filter(java.util.Objects::nonNull);
+                    if (rp.isPresent()) {
+                        return 
CompletableFuture.completedFuture(toRetentionMillis(rp.get()));
+                    }
+                    // Namespace layer.
+                    NamespaceName ns = topicName.getNamespaceObject();
+                    return brokerService.getPulsar().getPulsarResources()
+                            .getNamespaceResources()
+                            .getPoliciesAsync(ns)
+                            .thenApply(nsOpt -> {
+                                RetentionPolicies nsRp = nsOpt
+                                        .map(p -> p.retention_policies)
+                                        .orElse(null);
+                                if (nsRp != null) {
+                                    return toRetentionMillis(nsRp);
+                                }
+                                return 
defaultRetentionMillisFromBrokerConfig();
+                            });
+                });
+    }
+
+    private static Long toRetentionMillis(RetentionPolicies rp) {
+        if (rp.getRetentionTimeInMinutes() < 0) {
+            return null; // keep forever
+        }
+        return TimeUnit.MINUTES.toMillis(rp.getRetentionTimeInMinutes());
+    }
+
+    private Long defaultRetentionMillisFromBrokerConfig() {
+        var conf = brokerService.getPulsar().getConfig();
+        if (conf == null) {
+            return null;
+        }
+        int min = conf.getDefaultRetentionTimeInMinutes();
+        return min < 0 ? null : TimeUnit.MINUTES.toMillis(min);
+    }
+
+    /** Test hook: count of sealed segments currently in the layout. */
+    int sealedSegmentCount() {
+        SegmentLayout layout = currentLayout;
+        if (layout == null) {
+            return 0;
+        }
+        int n = 0;
+        for (SegmentInfo s : layout.getAllSegments().values()) {
+            if (s.isSealed()) {
+                n++;
+            }
+        }
+        return n;
+    }
+
     // --- Lifecycle ---
 
     public CompletableFuture<Void> close() {
         closed = true;
+        cancelGcTask();
         // Stop each coordinator's drain poller before clearing — otherwise 
the scheduler
         // task keeps running after the controller goes away.
         subscriptions.values().forEach(SubscriptionCoordinator::close);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 1ebd3f13ed0..c00b1923504 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -148,6 +148,13 @@ public class ScalableTopicControllerTest {
         return new ScalableTopicController(tn, resources, brokerService, 
coordinationService);
     }
 
+    private ScalableTopicController newControllerWithClock(TopicName tn,
+                                                           java.time.Clock 
clock,
+                                                           java.time.Duration 
gcInterval) {
+        return new ScalableTopicController(tn, resources, brokerService, 
coordinationService,
+                clock, gcInterval);
+    }
+
     // --- initialize() ---
 
     @Test
@@ -449,6 +456,141 @@ public class ScalableTopicControllerTest {
         }
     }
 
+    // --- Sealed-segment GC ---
+
+    /**
+     * After a split, the parent is sealed; with no live subscriptions and a 
small
+     * configured retention the GC tick should prune the parent from the 
layout and
+     * delete its backing topic.
+     */
+    @Test
+    public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws 
Exception {
+        // Inject GC-related mocks (topic-policies + namespace policies + 
delete).
+        installGcMocks(/* nsRetentionMinutes */ 1);
+
+        // Use a controllable clock so we can fast-forward past the retention 
window.
+        long startMs = 1_700_000_000_000L;
+        AdjustableClock clock = new AdjustableClock(startMs);
+        if (controller != null) {
+            controller.close().join();
+        }
+        controller = newControllerWithClock(topicName, clock,
+                java.time.Duration.ofHours(1)); // GC interval irrelevant — we 
drive ticks manually
+        controller.initialize().get();
+
+        int sealedBefore = controller.sealedSegmentCount();
+        // Split segment 0 → seg 0 sealed at startMs, children created at 
startMs.
+        controller.splitSegment(0).get();
+        assertEquals(controller.sealedSegmentCount(), sealedBefore + 1);
+
+        // Tick at the seal time — retention not yet elapsed; nothing pruned.
+        controller.runGcTickAsync().get();
+        
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
+                "tick within retention window must not prune");
+
+        // Fast-forward past 1 minute; tick should now prune segment 0.
+        clock.set(startMs + java.util.concurrent.TimeUnit.MINUTES.toMillis(1) 
+ 1_000L);
+        controller.runGcTickAsync().get();
+        
assertFalse(controller.getLayout().get().getAllSegments().containsKey(0L),
+                "tick past retention must prune the sealed segment");
+        // Backing topic delete was issued via the segment-aware admin call.
+        verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean());
+    }
+
+    /**
+     * If retention is set to "keep forever" (negative value), the GC tick is 
a no-op
+     * even for sealed + drained segments.
+     */
+    @Test
+    public void testGcTickRespectsKeepForeverRetention() throws Exception {
+        installGcMocks(/* nsRetentionMinutes */ -1);
+
+        long now = 1_700_000_000_000L;
+        if (controller != null) {
+            controller.close().join();
+        }
+        java.time.Clock fixed = java.time.Clock.fixed(
+                java.time.Instant.ofEpochMilli(now + 365L * 86_400_000L),
+                java.time.ZoneOffset.UTC);
+        controller = newControllerWithClock(topicName, fixed, 
java.time.Duration.ofHours(1));
+        controller.initialize().get();
+        controller.splitSegment(0).get();
+
+        controller.runGcTickAsync().get();
+        
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
+                "negative retention must keep sealed segments forever");
+    }
+
+    /** Settable {@link java.time.Clock} for the GC tick tests. */
+    private static final class AdjustableClock extends java.time.Clock {
+        private volatile long nowMs;
+
+        AdjustableClock(long initialMs) {
+            this.nowMs = initialMs;
+        }
+
+        void set(long nowMs) {
+            this.nowMs = nowMs;
+        }
+
+        @Override
+        public java.time.ZoneId getZone() {
+            return java.time.ZoneOffset.UTC;
+        }
+
+        @Override
+        public java.time.Clock withZone(java.time.ZoneId zone) {
+            return this; // tests don't care about zone
+        }
+
+        @Override
+        public java.time.Instant instant() {
+            return java.time.Instant.ofEpochMilli(nowMs);
+        }
+
+        @Override
+        public long millis() {
+            return nowMs;
+        }
+    }
+
+    /**
+     * Wire up the mocks the GC tick needs: empty topic-policies (so retention 
falls
+     * through to namespace), a namespace policy with the requested retention, 
a
+     * "drained" segment-backlog response (cursor at end), and a successful 
topic
+     * delete admin call.
+     */
+    @SuppressWarnings("unchecked")
+    private void installGcMocks(int nsRetentionMinutes) {
+        // Topic-policies service: no policies set on the topic.
+        var tps = 
mock(org.apache.pulsar.broker.service.TopicPoliciesService.class);
+        when(pulsar.getTopicPoliciesService()).thenReturn(tps);
+        when(tps.getTopicPoliciesAsync(any(),
+                
any(org.apache.pulsar.broker.service.TopicPoliciesService.GetType.class)))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+        // Namespace policy with the requested retention. Wired through 
PulsarResources.
+        var pulsarResources = 
mock(org.apache.pulsar.broker.resources.PulsarResources.class);
+        var namespaceResources = 
mock(org.apache.pulsar.broker.resources.NamespaceResources.class);
+        when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
+        
when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources);
+        org.apache.pulsar.common.policies.data.Policies nsPolicies =
+                new org.apache.pulsar.common.policies.data.Policies();
+        nsPolicies.retention_policies =
+                new org.apache.pulsar.common.policies.data.RetentionPolicies(
+                        nsRetentionMinutes, /* sizeMB */ -1);
+        when(namespaceResources.getPoliciesAsync(any()))
+                
.thenReturn(CompletableFuture.completedFuture(Optional.of(nsPolicies)));
+
+        // No active subscriptions on the parent → nothing to drain → segment 
is
+        // immediately considered prunable on retention expiry.
+        // (resources.listSubscriptionsAsync returns [] from the in-memory 
store by default.)
+
+        // Backing-topic delete succeeds (segment-aware admin call).
+        when(scalableTopics.deleteSegmentAsync(anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
     // --- ConsumerRegistration record sanity ---
 
     @Test

Reply via email to