This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f7cc5db6f7 [fix][broker] PIP-468: skip deleted segments in checkpoint
consumer (#25719)
7f7cc5db6f7 is described below
commit 7f7cc5db6f7321f27a4154133c4225ea3afee164
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 10:17:47 2026 -0700
[fix][broker] PIP-468: skip deleted segments in checkpoint consumer (#25719)
---
.../pulsar/broker/admin/v2/ScalableTopics.java | 35 +++++-
.../apache/pulsar/broker/admin/v2/Segments.java | 11 +-
.../pulsar/broker/service/BrokerService.java | 10 ++
.../service/scalable/ScalableTopicController.java | 33 ++++-
.../scalable/ScalableTopicControllerTest.java | 50 ++++++++
...V5CheckpointConsumerSkipDeletedSegmentTest.java | 134 +++++++++++++++++++++
.../client/impl/v5/ScalableCheckpointConsumer.java | 45 +++++++
7 files changed, 312 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 32468c48892..212d0019119 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -174,9 +175,8 @@ public class ScalableTopics extends AdminResource {
Map<String, String> props = properties != null ?
properties : Map.of();
ScalableTopicMetadata metadata =
ScalableTopicController.createInitialMetadata(
numInitialSegments, props);
- // Segment persistent topics are auto-created on demand
when clients connect,
- // so we only need to store the metadata here.
- return resources().createScalableTopicAsync(tn, metadata);
+ return resources().createScalableTopicAsync(tn, metadata)
+ .thenCompose(ignored ->
createInitialSegmentTopicsAsync(tn, metadata));
})
.thenAccept(__ -> {
log.info().attr("clientAppId",
clientAppId()).attr("topic", tn)
@@ -198,6 +198,35 @@ public class ScalableTopics extends AdminResource {
});
}
+ /**
+ * Create the backing persistent topic for each segment in the initial
layout.
+ *
+ * <p>Segment topics are NEVER auto-created on client connect (see
+ * {@code BrokerService.isAllowAutoTopicCreationAsync}); they only come
into
+ * existence through the controller's explicit-create path. So at
scalable-topic
+ * creation time we have to materialize the initial segment(s) up front,
before
+ * any producer or consumer arrives.
+ *
+ * <p>Routes via the internal admin client so each segment's create lands
on
+ * its bundle's owning broker (segment bundles can hash to a different
broker
+ * than the one handling this REST call).
+ */
+ private CompletableFuture<Void> createInitialSegmentTopicsAsync(
+ TopicName parentTopic, ScalableTopicMetadata metadata) {
+ try {
+ var admin = pulsar().getAdminClient();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (SegmentInfo seg : metadata.getSegments().values()) {
+ String segmentTopic = SegmentTopicName.fromParent(
+ parentTopic, seg.hashRange(),
seg.segmentId()).toString();
+
futures.add(admin.scalableTopics().createSegmentAsync(segmentTopic, List.of()));
+ }
+ return FutureUtil.waitForAll(futures);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
// --- Get metadata ---
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 507119518b0..189e632b7e3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -41,6 +42,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.CustomLog;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
@@ -98,7 +100,11 @@ public class Segments extends AdminResource {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
- .thenCompose(__ ->
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
+ // Explicit create — segments don't go through the auto-create
policy
+ // (BrokerService.isAllowAutoTopicCreationAsync forbids
segment auto-create).
+ .thenCompose(__ -> pulsar().getBrokerService()
+ .getTopic(segmentTopic.toString(), true)
+ .thenApply(Optional::get))
.thenCompose(topic -> {
log.info().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
.log("Created segment topic");
@@ -390,7 +396,8 @@ public class Segments extends AdminResource {
if (optTopic.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
- return optTopic.get().delete().thenApply(__ -> null);
+ Topic t = optTopic.get();
+ return (force ? t.deleteForcefully() :
t.delete()).thenApply(__ -> null);
})
.thenAccept(__ -> {
log.info().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7ecb6f2bdfd..71003a4d5e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -165,6 +165,7 @@ import
org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
@@ -3900,6 +3901,15 @@ public class BrokerService implements Closeable {
return CompletableFuture.completedFuture(false);
}
+ // Segment topics (PIP-468 scalable topics) are explicitly created by
the
+ // ScalableTopicController via the /admin/v2/segments endpoint. They
must
+ // never be auto-created on connect — otherwise a producer/consumer
racing
+ // a controller-driven delete (post-prune retention GC, force-delete)
would
+ // silently re-create the topic with default schema and mask the
deletion.
+ if (topicName.getDomain() == TopicDomain.segment) {
+ return CompletableFuture.completedFuture(false);
+ }
+
//Other system topics can be created automatically
if (pulsar.getConfiguration().isSystemTopicEnabled() &&
isSystemTopic(topicName)) {
return CompletableFuture.completedFuture(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 2d6c61e2256..cf43239f126 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -161,12 +161,43 @@ public class ScalableTopicController {
.thenCompose(__ -> {
if (isLeader()) {
scheduleGcTask();
- return restoreSessionsFromStore();
+ return ensureActiveSegmentsExist()
+ .thenCompose(___ ->
restoreSessionsFromStore());
}
return CompletableFuture.completedFuture(null);
});
}
+ /**
+ * Recovery path for active segments whose backing topics are missing —
e.g.,
+ * a {@code createScalableTopic} call that committed metadata but failed to
+ * materialize all initial segments before crashing, or a force-delete of
an
+ * active segment.
+ *
+ * <p>Idempotent: {@code createSegmentAsync} on an existing segment is a
+ * no-op at the broker (it just loads the existing topic).
+ *
+ * <p>Sealed segments are intentionally NOT healed here — if a sealed
segment's
+ * backing topic is gone the data is permanently gone (retention applied or
+ * an explicit delete), and re-creating an empty topic would mask that. The
+ * V5 checkpoint consumer skips sealed segments whose topics return
+ * {@code TopicDoesNotExist}.
+ */
+ private CompletableFuture<Void> ensureActiveSegmentsExist() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (SegmentInfo seg : currentLayout.getActiveSegments().values()) {
+ futures.add(createSegmentTopic(seg, List.of())
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId", seg.segmentId())
+ .exceptionMessage(ex)
+ .log("Failed to ensure active segment topic at
controller init; "
+ + "next attempt to use this segment
will retry");
+ return null;
+ }));
+ }
+ return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+ }
+
/**
* Schedule the periodic sealed-segment GC tick. Only fires on the
controller leader;
* idempotent (re-entry just no-ops). Cancelled on close / leader-loss.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 9da30e662d4..2eb78a8de40 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -22,7 +22,10 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -169,6 +172,49 @@ public class ScalableTopicControllerTest {
assertEquals(layout.getEpoch(), 0);
}
+ /**
+ * Recovery path: a {@code createScalableTopic} can commit metadata then
crash
+ * before it materializes all the initial segment topics. The next time the
+ * controller initializes (broker restart, bundle ownership transfer,
leader
+ * re-election), it must recreate any missing active-segment backing topics
+ * so producers/consumers can use them again. The check is idempotent —
+ * existing segments are simply re-loaded.
+ */
+ @Test
+ public void testInitializeRecreatesMissingActiveSegments() throws
Exception {
+ controller.initialize().get();
+
+ // The leader's initialize() must have asked the admin client to
+ // (re)materialize each of the INITIAL_SEGMENTS active segments.
+ verify(scalableTopics, times(INITIAL_SEGMENTS))
+ .createSegmentAsync(anyString(), any());
+ }
+
+ /**
+ * Idempotency partner to the above: a non-leader controller must NOT
trigger
+ * segment-topic creation. Only the leader heals; followers just observe
the
+ * layout. (We can't easily build a "loser" in this single-broker harness,
but
+ * we can at least verify that no segment is created when the controller
fails
+ * to become leader — e.g., because the topic metadata is gone.)
+ */
+ @Test
+ public void testInitializeDoesNotCreateSegmentsWhenNotLeader() throws
Exception {
+ // Drive a "not leader" outcome by pointing at a topic with no
metadata —
+ // initialize() bails before electLeader() /
ensureActiveSegmentsExist().
+ TopicName missing = TopicName.get("topic://tenant/ns/does-not-exist");
+ ScalableTopicController orphan = newController(missing);
+ try {
+ orphan.initialize().get();
+ fail("expected IllegalStateException for missing topic");
+ } catch (ExecutionException expected) {
+ // Bailed before leader-elect, as desired.
+ } finally {
+ orphan.close().join();
+ }
+ verify(scalableTopics, times(0))
+ .createSegmentAsync(eq(missing.toString()), any());
+ }
+
@Test
public void testInitializeFailsWhenTopicMissing() throws Exception {
TopicName missing = TopicName.get("topic://tenant/ns/does-not-exist");
@@ -344,6 +390,9 @@ public class ScalableTopicControllerTest {
@Test
public void testSplitSegment() throws Exception {
controller.initialize().get();
+ // initialize() now eagerly creates the active segment topics for
recovery —
+ // ignore those when counting what splitSegment itself triggered.
+ clearInvocations(scalableTopics);
SegmentLayout before = controller.getLayout().get();
long epochBefore = before.getEpoch();
int activeBefore = before.getActiveSegments().size();
@@ -362,6 +411,7 @@ public class ScalableTopicControllerTest {
@Test
public void testMergeSegments() throws Exception {
controller.initialize().get();
+ clearInvocations(scalableTopics);
SegmentLayout before = controller.getLayout().get();
long epochBefore = before.getEpoch();
int activeBefore = before.getActiveSegments().size();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
new file mode 100644
index 00000000000..cd63fabc28c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerSkipDeletedSegmentTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link CheckpointConsumer} survival when a segment's backing
topic
+ * is gone. The controller's GC eventually deletes pruned segment topics after
+ * their retention window expires; a checkpoint consumer doesn't dictate
retention,
+ * so when it restores against a DAG that still references a now-deleted
segment
+ * the segment's data is genuinely gone and the consumer must silently skip it,
+ * delivering everything still on disk — the same observable shape as
in-segment
+ * retention.
+ */
+public class V5CheckpointConsumerSkipDeletedSegmentTest extends
V5ClientBaseTest {
+
+ /**
+ * Force-delete one of two segments and open a checkpoint consumer: it must
+ * subscribe to the survivor and skip the deleted segment silently,
delivering
+ * all messages still on disk.
+ *
+ * <p>Segment topics are never auto-created (only the controller
materializes
+ * them, see {@code BrokerService.isAllowAutoTopicCreationAsync}), so a
delete
+ * sticks — the consumer's resubscribe attempt against the gone segment
+ * surfaces as {@code TopicDoesNotExist}, which the consumer must swallow.
+ */
+ @Test
+ public void testConsumerSkipsDeletedSegmentSilently() throws Exception {
+ String topic = newScalableTopic(2);
+ long deletedSegmentId = 0L;
+ long survivorSegmentId = 1L;
+
+ // Produce a batch with enough variety in keys that both segments end
up
+ // with messages. After producing, we ask the broker which messages
live
+ // on the survivor segment by counting its in-counter.
+ int n = 100;
+ @Cleanup
+ Producer<byte[]> producer = v5Client.newProducer(Schema.bytes())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().key("k-" + i).value(("msg-" +
i).getBytes()).send();
+ }
+ producer.close();
+
+ // Ground truth: how many messages should reach the consumer after
delete.
+ long survivorBacklog = segmentBacklog(topic, survivorSegmentId);
+ long deletedBacklog = segmentBacklog(topic, deletedSegmentId);
+ assertTrue(survivorBacklog > 0,
+ "test setup: hash distribution put no messages on the survivor
segment");
+ assertTrue(deletedBacklog > 0,
+ "test setup: hash distribution put no messages on the segment
to be deleted");
+
+ // Force-delete one segment's backing topic — the deterministic
stand-in
+ // for the controller's GC pruning it after retention expired.
+ String victimTopicName = SegmentTopicName.fromParent(
+ org.apache.pulsar.common.naming.TopicName.get(topic),
+ segmentHashRange(topic, deletedSegmentId),
deletedSegmentId).toString();
+ admin.scalableTopics().deleteSegment(victimTopicName, true);
+
+ // Open the consumer. The DAG still has segment 0 (we only deleted the
topic,
+ // not the metadata), so the consumer attempts to subscribe to it and
gets
+ // TopicDoesNotExist — which the fix swallows, leaving the survivor
reader
+ // to keep delivering. Must NOT throw.
+ @Cleanup
+ CheckpointConsumer<byte[]> consumer =
v5Client.newCheckpointConsumer(Schema.bytes())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int received = 0;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received < survivorBacklog && System.currentTimeMillis() <
deadline) {
+ Message<byte[]> msg = consumer.receive(Duration.ofMillis(500));
+ if (msg != null) {
+ received++;
+ }
+ }
+ assertEquals(received, survivorBacklog,
+ "every survivor-segment message must still be delivered");
+ }
+
+ // --- Helpers ---
+
+ /** Total messages currently on a segment's backing topic — used as ground
+ * truth for "what we expect the consumer to deliver." Goes through the
+ * broker's in-process topic reference because segment topics aren't
exposed
+ * via the regular {@code /admin/v2/persistent/.../stats} endpoint. */
+ private long segmentBacklog(String topic, long segmentId) throws Exception
{
+ String segmentTopic = SegmentTopicName.fromParent(
+ org.apache.pulsar.common.naming.TopicName.get(topic),
+ segmentHashRange(topic, segmentId), segmentId).toString();
+ var ref = getTopicReference(segmentTopic).orElseThrow(
+ () -> new AssertionError("segment topic not loaded: " +
segmentTopic));
+ return ref.getStats(false, false, false).getMsgInCounter();
+ }
+
+ private org.apache.pulsar.common.scalable.HashRange
segmentHashRange(String topic, long segmentId)
+ throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ var seg = meta.getSegments().get(segmentId);
+ if (seg == null) {
+ throw new AssertionError("segment " + segmentId + " not found in
DAG of " + topic);
+ }
+ // Admin-client SegmentInfo / HashRange are wire-payload @Data
classes, distinct
+ // from the broker-internal records — convert across the boundary.
+ var adminRange = seg.getHashRange();
+ return org.apache.pulsar.common.scalable.HashRange.of(
+ adminRange.getStart(), adminRange.getEnd());
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 1eedbb14566..4bdefb216a7 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -354,9 +354,38 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
.thenApply(reader -> {
startReadLoop(reader, segment.segmentId());
return reader;
+ })
+ .exceptionally(ex -> {
+ Throwable cause = ex instanceof CompletionException ce &&
ce.getCause() != null
+ ? ce.getCause() : ex;
+ if (isSegmentGoneError(cause)) {
+ // The backing topic was deleted by the controller's
GC after its
+ // retention window elapsed. The consumer may be
restoring from a
+ // checkpoint that pre-dates the prune, or racing a
layout update.
+ // Either way, the segment's data is gone — skip it
silently.
+ log.info().attr("segmentId", segment.segmentId())
+ .log("Segment backing topic deleted (retention
expired); skipping");
+ segmentReaders.remove(segment.segmentId());
+ lastReceivedPositions.remove(segment.segmentId());
+ return null;
+ }
+ throw ex instanceof CompletionException
+ ? (CompletionException) ex : new
CompletionException(ex);
});
}
+ /**
+ * True if {@code cause} indicates the segment's backing topic no longer
exists
+ * (deleted by the controller's GC, or a race between a layout update and
the
+ * post-prune topic delete).
+ */
+ private static boolean isSegmentGoneError(Throwable cause) {
+ return cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .TopicDoesNotExistException
+ || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .NotFoundException;
+ }
+
private org.apache.pulsar.client.api.MessageId resolveStartPosition(long
segmentId) {
if (startPosition instanceof CheckpointV5 cp) {
var pos = cp.segmentPositions().get(segmentId);
@@ -404,6 +433,22 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
log.info().attr("segmentId", segmentId)
.log("Sealed segment drained, closing reader");
segmentReaders.remove(segmentId);
+ lastReceivedPositions.remove(segmentId);
+ reader.closeAsync();
+ return null;
+ }
+ if (isSegmentGoneError(cause)) {
+ // Segment backing topic deleted underneath us — the
controller's GC
+ // pruned it after retention expired. The DagWatch layout
update would
+ // normally remove this segment from the assignment first, but
a
+ // network blip or in-flight read can lose that race. Treat as
+ // drained: close the reader and drop the position tracker so a
+ // subsequent checkpoint() doesn't carry a stale ID for a topic
+ // that no longer exists.
+ log.info().attr("segmentId", segmentId)
+ .log("Segment backing topic deleted (retention
expired); closing reader");
+ segmentReaders.remove(segmentId);
+ lastReceivedPositions.remove(segmentId);
reader.closeAsync();
return null;
}