This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5b97483a198 [feat] PIP-468: segment-aware admin endpoints for cursor
lifecycle (#25717)
5b97483a198 is described below
commit 5b97483a1981fa3cdd8748a6db022a6ef31a44b1
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 14:41:37 2026 -0700
[feat] PIP-468: segment-aware admin endpoints for cursor lifecycle (#25717)
---
.../pulsar/broker/admin/v2/ScalableTopics.java | 24 ++---
.../apache/pulsar/broker/admin/v2/Segments.java | 101 +++++++++++++++++++++
.../service/scalable/ScalableTopicController.java | 26 ++----
.../scalable/ScalableTopicControllerTest.java | 19 ++--
.../api/v5/V5ScalableSubscriptionAdminTest.java | 91 +++++++++++++++++++
.../apache/pulsar/client/admin/ScalableTopics.java | 26 ++++++
.../client/admin/internal/ScalableTopicsImpl.java | 22 +++++
7 files changed, 263 insertions(+), 46 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 212d0019119..6b73fdf00d6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -704,8 +703,9 @@ public class ScalableTopics extends AdminResource {
}
/**
- * Best-effort delete underlying persistent topics for all segments.
- * Uses the internal admin client which handles cross-broker routing.
+ * Best-effort delete the underlying topic for every segment in the DAG.
Uses the
+ * segment-aware admin endpoint, which routes to the segment-owning broker
via the
+ * standard bundle-ownership lookup.
*/
private CompletableFuture<Void> deleteSegmentTopics(TopicName parentTopic,
ScalableTopicMetadata
metadata,
@@ -714,10 +714,11 @@ public class ScalableTopics extends AdminResource {
var admin = pulsar().getAdminClient();
CompletableFuture<?>[] futures =
metadata.getSegments().values().stream()
.map(seg -> {
- String name = segmentPersistentName(parentTopic, seg);
- return admin.topics().deleteAsync(name, force)
+ String segmentTopicName = SegmentTopicName.fromParent(
+ parentTopic, seg.hashRange(),
seg.segmentId()).toString();
+ return
admin.scalableTopics().deleteSegmentAsync(segmentTopicName, force)
.exceptionally(ex -> {
- log.warn().attr("segment",
name).exceptionMessage(ex)
+ log.warn().attr("segment",
segmentTopicName).exceptionMessage(ex)
.log("Failed to delete segment
topic");
return null;
});
@@ -730,15 +731,4 @@ public class ScalableTopics extends AdminResource {
return CompletableFuture.completedFuture(null);
}
}
-
- /**
- * Convert a segment:// topic name to persistent:// for the underlying
managed ledger topic.
- */
- private String segmentPersistentName(TopicName parentTopic, SegmentInfo
segment) {
- TopicName segTopic = SegmentTopicName.fromParent(
- parentTopic, segment.hashRange(), segment.segmentId());
- return "persistent://" + segTopic.getTenant() + "/"
- + segTopic.getNamespacePortion() + "/"
- + segTopic.getLocalName();
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 189e632b7e3..e8d4eeff654 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -181,6 +181,107 @@ public class Segments extends AdminResource {
});
}
+ @PUT
+
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
+ @ApiOperation(value = "Create a subscription cursor on the segment topic
at the earliest"
+ + " position. Super-user only.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Subscription cursor created
(or already existed)"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void createSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify the parent topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)",
required = true)
+ @PathParam("descriptor") String descriptor,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateNamespaceName(tenant, namespace);
+ TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
+
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
+ .thenCompose(__ ->
pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString()))
+ .thenCompose(topic -> topic.createSubscription(subscription,
+ CommandSubscribe.InitialPosition.Earliest, false,
null))
+ .thenAccept(__ -> {
+ log.info().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .log("Created subscription on segment topic");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .exception(ex).log("Failed to create subscription
on segment");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @DELETE
+
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}")
+ @ApiOperation(value = "Delete a subscription cursor on the segment topic.
Super-user only.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Subscription cursor deleted
(or never existed)"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void deleteSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify the parent topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)",
required = true)
+ @PathParam("descriptor") String descriptor,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateNamespaceName(tenant, namespace);
+ TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
+
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
+ .thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+ .thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ // Topic not loaded → no cursor to delete. Idempotent
success.
+ return CompletableFuture.completedFuture(null);
+ }
+ var sub = optTopic.get().getSubscription(subscription);
+ if (sub == null) {
+ // Subscription doesn't exist on this segment —
idempotent success.
+ return CompletableFuture.completedFuture(null);
+ }
+ return sub.delete();
+ })
+ .thenAccept(__ -> {
+ log.info().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .log("Deleted subscription on segment topic");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .exception(ex).log("Failed to delete subscription
on segment");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog")
@ApiOperation(value = "Number of unconsumed entries in the segment topic
for the "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index cf43239f126..44d2230e49e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -729,11 +729,11 @@ public class ScalableTopicController {
}
private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo
segment, String subscription) {
- String persistentName = toSegmentUnderlyingPersistentName(segment);
+ String segmentTopicName = toSegmentPersistentName(segment);
try {
return brokerService.getPulsar().getAdminClient()
- .topics().createSubscriptionAsync(persistentName,
subscription,
- org.apache.pulsar.client.api.MessageId.earliest)
+ .scalableTopics()
+ .createSegmentSubscriptionAsync(segmentTopicName,
subscription)
.exceptionally(ex -> {
Throwable cause =
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException.ConflictException) {
@@ -748,17 +748,18 @@ public class ScalableTopicController {
}
private CompletableFuture<Void> deleteSubscriptionOnSegment(SegmentInfo
segment, String subscription) {
- String persistentName = toSegmentUnderlyingPersistentName(segment);
+ String segmentTopicName = toSegmentPersistentName(segment);
try {
return brokerService.getPulsar().getAdminClient()
- .topics().deleteSubscriptionAsync(persistentName,
subscription, true)
+ .scalableTopics()
+ .deleteSegmentSubscriptionAsync(segmentTopicName,
subscription)
.exceptionally(ex -> {
Throwable cause =
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) {
return null;
}
log.warn().attr("subscription", subscription)
- .attr("segment",
persistentName).exceptionMessage(cause)
+ .attr("segment",
segmentTopicName).exceptionMessage(cause)
.log("Failed to delete subscription from
segment");
return null;
});
@@ -1114,19 +1115,6 @@ public class ScalableTopicController {
return segmentTopicName.toString();
}
- /**
- * Return the {@code persistent://} form of a segment's underlying
managed-ledger topic,
- * suitable for the standard {@link org.apache.pulsar.client.admin.Topics}
admin API.
- * The segment-owning broker is discovered by the admin client's normal
bundle routing.
- */
- private String toSegmentUnderlyingPersistentName(SegmentInfo segment) {
- TopicName segmentTopicName = SegmentTopicName.fromParent(
- topicName, segment.hashRange(), segment.segmentId());
- return "persistent://" + segmentTopicName.getTenant() + "/"
- + segmentTopicName.getNamespacePortion() + "/"
- + segmentTopicName.getLocalName();
- }
-
private CompletableFuture<Void> terminateSegmentTopic(String
segmentTopicName) {
try {
return brokerService.getPulsar().getAdminClient()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 2eb78a8de40..183e66b68d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.admin.Topics;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -121,14 +120,14 @@ public class ScalableTopicControllerTest {
// Default: all admin ops succeed.
when(topics.getSubscriptionsAsync(anyString()))
.thenReturn(CompletableFuture.completedFuture(java.util.List.of()));
- when(topics.createSubscriptionAsync(anyString(), anyString(),
any(MessageId.class)))
- .thenReturn(CompletableFuture.completedFuture(null));
- when(topics.deleteSubscriptionAsync(anyString(), anyString(),
anyBoolean()))
- .thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.createSegmentAsync(anyString(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(scalableTopics.terminateSegmentAsync(anyString()))
.thenReturn(CompletableFuture.completedFuture(null));
+ when(scalableTopics.createSegmentSubscriptionAsync(anyString(),
anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(scalableTopics.deleteSegmentSubscriptionAsync(anyString(),
anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
controller = newController(topicName);
}
@@ -337,9 +336,9 @@ public class ScalableTopicControllerTest {
resources.getSubscriptionAsync(topicName, "sub-stream").get();
assertTrue(persisted.isPresent());
assertEquals(persisted.get().type(), SubscriptionType.STREAM);
- // Propagated to every active segment via
admin.topics().createSubscriptionAsync().
- verify(topics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
- .createSubscriptionAsync(anyString(), anyString(),
any(MessageId.class));
+ // Propagated to every active segment via the segment-subscription
admin endpoint.
+ verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
+ .createSegmentSubscriptionAsync(anyString(), anyString());
}
@Test
@@ -370,8 +369,8 @@ public class ScalableTopicControllerTest {
controller.deleteSubscription("sub-a").get();
assertFalse(resources.getSubscriptionAsync(topicName,
"sub-a").get().isPresent());
// Propagated a delete to every segment (all segments incl. any sealed
ones).
- verify(topics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
- .deleteSubscriptionAsync(anyString(), anyString(),
anyBoolean());
+ verify(scalableTopics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS))
+ .deleteSegmentSubscriptionAsync(anyString(), anyString());
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
new file mode 100644
index 00000000000..d55495999d5
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableSubscriptionType;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().createSubscription(...)}: the
admin
+ * API must materialize a cursor on every active segment so a consumer that
+ * subscribes <em>after</em> messages are produced still receives those
+ * messages — the whole point of pre-creating the subscription.
+ *
+ * <p>The behavioural assertion (a late consumer sees pre-subscription
messages)
+ * is the user-facing guarantee, and any regression in
+ * {@code ScalableTopicController.createSubscriptionOnSegment} — which converts
+ * each {@code SegmentInfo} to the underlying {@code persistent://} topic and
+ * pre-creates the cursor through the standard topic admin API — would surface
+ * here as messages going missing.
+ */
+public class V5ScalableSubscriptionAdminTest extends V5ClientBaseTest {
+
+ @Test
+ public void testPreCreatedSubscriptionRetainsPreProductionMessages()
throws Exception {
+ String topic = newScalableTopic(3);
+ String subscription = "pre-created-sub";
+
+ // Pre-create the subscription on the scalable topic. This must
materialize a
+ // cursor on every active segment so that subsequent produces are
retained
+ // until the consumer drains them.
+ admin.scalableTopics().createSubscription(topic, subscription,
+ ScalableSubscriptionType.QUEUE);
+
+ // Produce *before* any consumer subscribes.
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ int n = 30;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "msg-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Subscribe with the SAME subscription name. If createSubscription
truly
+ // pre-created cursors on every segment, the consumer must receive
every
+ // message produced above. If it didn't, the consumer attaches at
"latest"
+ // by default and drops the entire backlog.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < n && System.currentTimeMillis() < deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ }
+ assertEquals(received, sent,
+ "pre-created subscription must retain every message produced
before consumer subscribed");
+ }
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index 10da169d14e..f9fb01444a6 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -313,6 +313,32 @@ public interface ScalableTopics {
*/
CompletableFuture<Void> deleteSegmentAsync(String segmentTopic, boolean
force);
+ /**
+ * Create a subscription cursor on the given segment topic at the earliest
position.
+ * The call routes to the broker that owns the segment.
+ *
+ * <p>Used internally by {@link
org.apache.pulsar.broker.service.scalable.ScalableTopicController
+ * ScalableTopicController} to fan a new scalable-topic subscription out
across every
+ * active segment so a future consumer doesn't drop the backlog.
+ *
+ * @param segmentTopic Full segment topic name ({@code
segment://tenant/namespace/topic/descriptor})
+ * @param subscription Subscription name
+ */
+ CompletableFuture<Void> createSegmentSubscriptionAsync(String
segmentTopic, String subscription);
+
+ /**
+ * Delete a subscription cursor on the given segment topic. The call
routes to the broker
+ * that owns the segment.
+ *
+ * <p>Used internally by {@link
org.apache.pulsar.broker.service.scalable.ScalableTopicController
+ * ScalableTopicController} when a scalable-topic subscription is deleted,
so no orphan
+ * cursors remain on any segment in the DAG.
+ *
+ * @param segmentTopic Full segment topic name ({@code
segment://tenant/namespace/topic/descriptor})
+ * @param subscription Subscription name
+ */
+ CompletableFuture<Void> deleteSegmentSubscriptionAsync(String
segmentTopic, String subscription);
+
/**
* Returns the number of unconsumed entries in the given subscription's
cursor on the
* segment topic — i.e. the per-subscription backlog. The call routes to
the broker
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 5a040123d2e..57d26c7011a 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -290,6 +290,28 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncDeleteRequest(path);
}
+ @Override
+ public CompletableFuture<Void> createSegmentSubscriptionAsync(String
segmentTopic,
+ String
subscription) {
+ TopicName tn = TopicName.get(segmentTopic);
+ WebTarget path = adminSegments
+ .path(tn.getTenant()).path(tn.getNamespacePortion())
+ .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+ .path("subscription").path(subscription);
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSegmentSubscriptionAsync(String
segmentTopic,
+ String
subscription) {
+ TopicName tn = TopicName.get(segmentTopic);
+ WebTarget path = adminSegments
+ .path(tn.getTenant()).path(tn.getNamespacePortion())
+ .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+ .path("subscription").path(subscription);
+ return asyncDeleteRequest(path);
+ }
+
@Override
public CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String
segmentTopic,
String
subscription) {