This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f7cc5db6f7 [fix][broker] PIP-468: skip deleted segments in checkpoint 
consumer (#25719)
7f7cc5db6f7 is described below

commit 7f7cc5db6f7321f27a4154133c4225ea3afee164
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 10:17:47 2026 -0700

    [fix][broker] PIP-468: skip deleted segments in checkpoint consumer (#25719)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  35 +++++-
 .../apache/pulsar/broker/admin/v2/Segments.java    |  11 +-
 .../pulsar/broker/service/BrokerService.java       |  10 ++
 .../service/scalable/ScalableTopicController.java  |  33 ++++-
 .../scalable/ScalableTopicControllerTest.java      |  50 ++++++++
 ...V5CheckpointConsumerSkipDeletedSegmentTest.java | 134 +++++++++++++++++++++
 .../client/impl/v5/ScalableCheckpointConsumer.java |  45 +++++++
 7 files changed, 312 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 32468c48892..212d0019119 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -174,9 +175,8 @@ public class ScalableTopics extends AdminResource {
                     Map<String, String> props = properties != null ? 
properties : Map.of();
                     ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(
                             numInitialSegments, props);
-                    // Segment persistent topics are auto-created on demand 
when clients connect,
-                    // so we only need to store the metadata here.
-                    return resources().createScalableTopicAsync(tn, metadata);
+                    return resources().createScalableTopicAsync(tn, metadata)
+                            .thenCompose(ignored -> 
createInitialSegmentTopicsAsync(tn, metadata));
                 })
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", 
clientAppId()).attr("topic", tn)
@@ -198,6 +198,35 @@ public class ScalableTopics extends AdminResource {
                 });
     }
 
+    /**
+     * Create the backing persistent topic for each segment in the initial 
layout.
+     *
+     * <p>Segment topics are NEVER auto-created on client connect (see
+     * {@code BrokerService.isAllowAutoTopicCreationAsync}); they only come 
into
+     * existence through the controller's explicit-create path. So at 
scalable-topic
+     * creation time we have to materialize the initial segment(s) up front, 
before
+     * any producer or consumer arrives.
+     *
+     * <p>Routes via the internal admin client so each segment's create lands 
on
+     * its bundle's owning broker (segment bundles can hash to a different 
broker
+     * than the one handling this REST call).
+     */
+    private CompletableFuture<Void> createInitialSegmentTopicsAsync(
+            TopicName parentTopic, ScalableTopicMetadata metadata) {
+        try {
+            var admin = pulsar().getAdminClient();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            for (SegmentInfo seg : metadata.getSegments().values()) {
+                String segmentTopic = SegmentTopicName.fromParent(
+                        parentTopic, seg.hashRange(), 
seg.segmentId()).toString();
+                
futures.add(admin.scalableTopics().createSegmentAsync(segmentTopic, List.of()));
+            }
+            return FutureUtil.waitForAll(futures);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
     // --- Get metadata ---
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 507119518b0..189e632b7e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -41,6 +42,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -98,7 +100,11 @@ public class Segments extends AdminResource {
 
         validateSuperUserAccessAsync()
                 .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
-                .thenCompose(__ -> 
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
+                // Explicit create — segments don't go through the auto-create 
policy
+                // (BrokerService.isAllowAutoTopicCreationAsync forbids 
segment auto-create).
+                .thenCompose(__ -> pulsar().getBrokerService()
+                        .getTopic(segmentTopic.toString(), true)
+                        .thenApply(Optional::get))
                 .thenCompose(topic -> {
                     log.info().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
                             .log("Created segment topic");
@@ -390,7 +396,8 @@ public class Segments extends AdminResource {
                     if (optTopic.isEmpty()) {
                         return CompletableFuture.completedFuture(null);
                     }
-                    return optTopic.get().delete().thenApply(__ -> null);
+                    Topic t = optTopic.get();
+                    return (force ? t.deleteForcefully() : 
t.delete()).thenApply(__ -> null);
                 })
                 .thenAccept(__ -> {
                     log.info().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7ecb6f2bdfd..71003a4d5e5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -165,6 +165,7 @@ import 
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
@@ -3900,6 +3901,15 @@ public class BrokerService implements Closeable {
             return CompletableFuture.completedFuture(false);
         }
 
+        // Segment topics (PIP-468 scalable topics) are explicitly created by 
the
+        // ScalableTopicController via the /admin/v2/segments endpoint. They 
must
+        // never be auto-created on connect — otherwise a producer/consumer 
racing
+        // a controller-driven delete (post-prune retention GC, force-delete) 
would
+        // silently re-create the topic with default schema and mask the 
deletion.
+        if (topicName.getDomain() == TopicDomain.segment) {
+            return CompletableFuture.completedFuture(false);
+        }
+
         //Other system topics can be created automatically
         if (pulsar.getConfiguration().isSystemTopicEnabled() && 
isSystemTopic(topicName)) {
             return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 2d6c61e2256..cf43239f126 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -161,12 +161,43 @@ public class ScalableTopicController {
                 .thenCompose(__ -> {
                     if (isLeader()) {
                         scheduleGcTask();
-                        return restoreSessionsFromStore();
+                        return ensureActiveSegmentsExist()
+                                .thenCompose(___ -> 
restoreSessionsFromStore());
                     }
                     return CompletableFuture.completedFuture(null);
                 });
     }
 
+    /**
+     * Recovery path for active segments whose backing topics are missing — 
e.g.,
+     * a {@code createScalableTopic} call that committed metadata but failed to
+     * materialize all initial segments before crashing, or a force-delete of 
an
+     * active segment.
+     *
+     * <p>Idempotent: {@code createSegmentAsync} on an existing segment is a
+     * no-op at the broker (it just loads the existing topic).
+     *
+     * <p>Sealed segments are intentionally NOT healed here — if a sealed 
segment's
+     * backing topic is gone the data is permanently gone (retention applied or
+     * an explicit delete), and re-creating an empty topic would mask that. The
+     * V5 checkpoint consumer skips sealed segments whose topics return
+     * {@code TopicDoesNotExist}.
+     */
+    private CompletableFuture<Void> ensureActiveSegmentsExist() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (SegmentInfo seg : currentLayout.getActiveSegments().values()) {
+            futures.add(createSegmentTopic(seg, List.of())
+                    .exceptionally(ex -> {
+                        log.warn().attr("segmentId", seg.segmentId())
+                                .exceptionMessage(ex)
+                                .log("Failed to ensure active segment topic at 
controller init; "
+                                        + "next attempt to use this segment 
will retry");
+                        return null;
+                    }));
+        }
+        return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+    }
+
     /**
      * Schedule the periodic sealed-segment GC tick. Only fires on the 
controller leader;
      * idempotent (re-entry just no-ops). Cancelled on close / leader-loss.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 9da30e662d4..2eb78a8de40 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -22,7 +22,10 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -169,6 +172,49 @@ public class ScalableTopicControllerTest {
         assertEquals(layout.getEpoch(), 0);
     }
 
+    /**
+     * Recovery path: a {@code createScalableTopic} can commit metadata then 
crash
+     * before it materializes all the initial segment topics. The next time the
+     * controller initializes (broker restart, bundle ownership transfer, 
leader
+     * re-election), it must recreate any missing active-segment backing topics
+     * so producers/consumers can use them again. The check is idempotent —
+     * existing segments are simply re-loaded.
+     */
+    @Test
+    public void testInitializeRecreatesMissingActiveSegments() throws 
Exception {
+        controller.initialize().get();
+
+        // The leader's initialize() must have asked the admin client to
+        // (re)materialize each of the INITIAL_SEGMENTS active segments.
+        verify(scalableTopics, times(INITIAL_SEGMENTS))
+                .createSegmentAsync(anyString(), any());
+    }
+
+    /**
+     * Idempotency partner to the above: a non-leader controller must NOT 
trigger
+     * segment-topic creation. Only the leader heals; followers just observe 
the
+     * layout. (We can't easily build a "loser" in this single-broker harness, 
but
+     * we can at least verify that no segment is created when the controller 
fails
+     * to become leader — e.g., because the topic metadata is gone.)
+     */
+    @Test
+    public void testInitializeDoesNotCreateSegmentsWhenNotLeader() throws 
Exception {
+        // Drive a "not leader" outcome by pointing at a topic with no 
metadata —
+        // initialize() bails before electLeader() / 
ensureActiveSegmentsExist().
+        TopicName missing = TopicName.get("topic://tenant/ns/does-not-exist");
+        ScalableTopicController orphan = newController(missing);
+        try {
+            orphan.initialize().get();
+            fail("expected IllegalStateException for missing topic");
+        } catch (ExecutionException expected) {
+            // Bailed before leader-elect, as desired.
+        } finally {
+            orphan.close().join();
+        }
+        verify(scalableTopics, times(0))
+                .createSegmentAsync(eq(missing.toString()), any());
+    }
+
     @Test
     public void testInitializeFailsWhenTopicMissing() throws Exception {
         TopicName missing = TopicName.get("topic://tenant/ns/does-not-exist");
@@ -344,6 +390,9 @@ public class ScalableTopicControllerTest {
     @Test
     public void testSplitSegment() throws Exception {
         controller.initialize().get();
+        // initialize() now eagerly creates the active segment topics for 
recovery —
+        // ignore those when counting what splitSegment itself triggered.
+        clearInvocations(scalableTopics);
         SegmentLayout before = controller.getLayout().get();
         long epochBefore = before.getEpoch();
         int activeBefore = before.getActiveSegments().size();
@@ -362,6 +411,7 @@ public class ScalableTopicControllerTest {
     @Test
     public void testMergeSegments() throws Exception {
         controller.initialize().get();
+        clearInvocations(scalableTopics);
         SegmentLayout before = controller.getLayout().get();
         long epochBefore = before.getEpoch();
         int activeBefore = before.getActiveSegments().size();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
new file mode 100644
index 00000000000..cd63fabc28c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link CheckpointConsumer} survival when a segment's backing 
topic
+ * is gone. The controller's GC eventually deletes pruned segment topics after
+ * their retention window expires; a checkpoint consumer doesn't dictate 
retention,
+ * so when it restores against a DAG that still references a now-deleted 
segment
+ * the segment's data is genuinely gone and the consumer must silently skip it,
+ * delivering everything still on disk — the same observable shape as 
in-segment
+ * retention.
+ */
+public class V5CheckpointConsumerSkipDeletedSegmentTest extends 
V5ClientBaseTest {
+
+    /**
+     * Force-delete one of two segments and open a checkpoint consumer: it must
+     * subscribe to the survivor and skip the deleted segment silently, 
delivering
+     * all messages still on disk.
+     *
+     * <p>Segment topics are never auto-created (only the controller 
materializes
+     * them, see {@code BrokerService.isAllowAutoTopicCreationAsync}), so a 
delete
+     * sticks — the consumer's resubscribe attempt against the gone segment
+     * surfaces as {@code TopicDoesNotExist}, which the consumer must swallow.
+     */
+    @Test
+    public void testConsumerSkipsDeletedSegmentSilently() throws Exception {
+        String topic = newScalableTopic(2);
+        long deletedSegmentId = 0L;
+        long survivorSegmentId = 1L;
+
+        // Produce a batch with enough variety in keys that both segments end 
up
+        // with messages. After producing, we ask the broker which messages 
live
+        // on the survivor segment by counting its in-counter.
+        int n = 100;
+        @Cleanup
+        Producer<byte[]> producer = v5Client.newProducer(Schema.bytes())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().key("k-" + i).value(("msg-" + 
i).getBytes()).send();
+        }
+        producer.close();
+
+        // Ground truth: how many messages should reach the consumer after 
delete.
+        long survivorBacklog = segmentBacklog(topic, survivorSegmentId);
+        long deletedBacklog = segmentBacklog(topic, deletedSegmentId);
+        assertTrue(survivorBacklog > 0,
+                "test setup: hash distribution put no messages on the survivor 
segment");
+        assertTrue(deletedBacklog > 0,
+                "test setup: hash distribution put no messages on the segment 
to be deleted");
+
+        // Force-delete one segment's backing topic — the deterministic 
stand-in
+        // for the controller's GC pruning it after retention expired.
+        String victimTopicName = SegmentTopicName.fromParent(
+                org.apache.pulsar.common.naming.TopicName.get(topic),
+                segmentHashRange(topic, deletedSegmentId), 
deletedSegmentId).toString();
+        admin.scalableTopics().deleteSegment(victimTopicName, true);
+
+        // Open the consumer. The DAG still has segment 0 (we only deleted the 
topic,
+        // not the metadata), so the consumer attempts to subscribe to it and 
gets
+        // TopicDoesNotExist — which the fix swallows, leaving the survivor 
reader
+        // to keep delivering. Must NOT throw.
+        @Cleanup
+        CheckpointConsumer<byte[]> consumer = 
v5Client.newCheckpointConsumer(Schema.bytes())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int received = 0;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received < survivorBacklog && System.currentTimeMillis() < 
deadline) {
+            Message<byte[]> msg = consumer.receive(Duration.ofMillis(500));
+            if (msg != null) {
+                received++;
+            }
+        }
+        assertEquals(received, survivorBacklog,
+                "every survivor-segment message must still be delivered");
+    }
+
+    // --- Helpers ---
+
+    /** Total messages currently on a segment's backing topic — used as ground
+     * truth for "what we expect the consumer to deliver." Goes through the
+     * broker's in-process topic reference because segment topics aren't 
exposed
+     * via the regular {@code /admin/v2/persistent/.../stats} endpoint. */
+    private long segmentBacklog(String topic, long segmentId) throws Exception 
{
+        String segmentTopic = SegmentTopicName.fromParent(
+                org.apache.pulsar.common.naming.TopicName.get(topic),
+                segmentHashRange(topic, segmentId), segmentId).toString();
+        var ref = getTopicReference(segmentTopic).orElseThrow(
+                () -> new AssertionError("segment topic not loaded: " + 
segmentTopic));
+        return ref.getStats(false, false, false).getMsgInCounter();
+    }
+
+    private org.apache.pulsar.common.scalable.HashRange 
segmentHashRange(String topic, long segmentId)
+            throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        var seg = meta.getSegments().get(segmentId);
+        if (seg == null) {
+            throw new AssertionError("segment " + segmentId + " not found in 
DAG of " + topic);
+        }
+        // Admin-client SegmentInfo / HashRange are wire-payload @Data 
classes, distinct
+        // from the broker-internal records — convert across the boundary.
+        var adminRange = seg.getHashRange();
+        return org.apache.pulsar.common.scalable.HashRange.of(
+                adminRange.getStart(), adminRange.getEnd());
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 1eedbb14566..4bdefb216a7 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -354,9 +354,38 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
                 .thenApply(reader -> {
                     startReadLoop(reader, segment.segmentId());
                     return reader;
+                })
+                .exceptionally(ex -> {
+                    Throwable cause = ex instanceof CompletionException ce && 
ce.getCause() != null
+                            ? ce.getCause() : ex;
+                    if (isSegmentGoneError(cause)) {
+                        // The backing topic was deleted by the controller's 
GC after its
+                        // retention window elapsed. The consumer may be 
restoring from a
+                        // checkpoint that pre-dates the prune, or racing a 
layout update.
+                        // Either way, the segment's data is gone — skip it 
silently.
+                        log.info().attr("segmentId", segment.segmentId())
+                                .log("Segment backing topic deleted (retention 
expired); skipping");
+                        segmentReaders.remove(segment.segmentId());
+                        lastReceivedPositions.remove(segment.segmentId());
+                        return null;
+                    }
+                    throw ex instanceof CompletionException
+                            ? (CompletionException) ex : new 
CompletionException(ex);
                 });
     }
 
+    /**
+     * True if {@code cause} indicates the segment's backing topic no longer 
exists
+     * (deleted by the controller's GC, or a race between a layout update and 
the
+     * post-prune topic delete).
+     */
+    private static boolean isSegmentGoneError(Throwable cause) {
+        return cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                .TopicDoesNotExistException
+                || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                .NotFoundException;
+    }
+
     private org.apache.pulsar.client.api.MessageId resolveStartPosition(long 
segmentId) {
         if (startPosition instanceof CheckpointV5 cp) {
             var pos = cp.segmentPositions().get(segmentId);
@@ -404,6 +433,22 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
                 log.info().attr("segmentId", segmentId)
                         .log("Sealed segment drained, closing reader");
                 segmentReaders.remove(segmentId);
+                lastReceivedPositions.remove(segmentId);
+                reader.closeAsync();
+                return null;
+            }
+            if (isSegmentGoneError(cause)) {
+                // Segment backing topic deleted underneath us — the 
controller's GC
+                // pruned it after retention expired. The DagWatch layout 
update would
+                // normally remove this segment from the assignment first, but 
a
+                // network blip or in-flight read can lose that race. Treat as
+                // drained: close the reader and drop the position tracker so a
+                // subsequent checkpoint() doesn't carry a stale ID for a topic
+                // that no longer exists.
+                log.info().attr("segmentId", segmentId)
+                        .log("Segment backing topic deleted (retention 
expired); closing reader");
+                segmentReaders.remove(segmentId);
+                lastReceivedPositions.remove(segmentId);
                 reader.closeAsync();
                 return null;
             }

Reply via email to